001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.TimeUnit; 030import java.util.regex.Matcher; 031import java.util.regex.Pattern; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.AuthUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ZooKeeperConnectionException; 037import org.apache.hadoop.hbase.security.Superusers; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.zookeeper.KeeperException; 044import org.apache.zookeeper.WatchedEvent; 045import org.apache.zookeeper.Watcher; 046import org.apache.zookeeper.ZooDefs.Ids; 047import org.apache.zookeeper.ZooDefs.Perms; 048import org.apache.zookeeper.data.ACL; 049import org.apache.zookeeper.data.Id; 050import org.apache.zookeeper.data.Stat; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated 056 * for each Master, RegionServer, and client process. 057 * 058 * <p>This is the only class that implements {@link Watcher}. Other internal 059 * classes which need to be notified of ZooKeeper events must register with 060 * the local instance of this watcher via {@link #registerListener}. 061 * 062 * <p>This class also holds and manages the connection to ZooKeeper. Code to 063 * deal with connection related events and exceptions are handled here. 064 */ 065@InterfaceAudience.Private 066public class ZKWatcher implements Watcher, Abortable, Closeable { 067 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 068 069 // Identifier for this watcher (for logging only). It is made of the prefix 070 // passed on construction and the zookeeper sessionid. 071 private String prefix; 072 private String identifier; 073 074 // zookeeper quorum 075 private String quorum; 076 077 // zookeeper connection 078 private final RecoverableZooKeeper recoverableZooKeeper; 079 080 // abortable in case of zk failure 081 protected Abortable abortable; 082 // Used if abortable is null 083 private boolean aborted = false; 084 085 private final ZNodePaths znodePaths; 086 087 // listeners to be notified 088 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 089 090 // Single threaded executor pool that processes event notifications from Zookeeper. Events are 091 // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do 092 // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. 093 // EventThread internally runs a single while loop to serially process all the events. When events 094 // are processed by the listeners in the same thread, that blocks the EventThread from processing 095 // subsequent events. Processing events in a separate thread frees up the event thread to continue 096 // and further prevents deadlocks if the process method itself makes other zookeeper calls. 097 // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the 098 // requests using a single while loop and hence there is no performance degradation. 099 private final ExecutorService zkEventProcessor = Executors.newSingleThreadExecutor( 100 new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d") 101 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 102 103 private final Configuration conf; 104 105 private final long zkSyncTimeout; 106 107 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 108 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 109 110 /** 111 * Instantiate a ZooKeeper connection and watcher. 112 * @param identifier string that is passed to RecoverableZookeeper to be used as 113 * identifier for this instance. Use null for default. 114 * @throws IOException if the connection to ZooKeeper fails 115 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 116 */ 117 public ZKWatcher(Configuration conf, String identifier, 118 Abortable abortable) throws ZooKeeperConnectionException, IOException { 119 this(conf, identifier, abortable, false); 120 } 121 122 /** 123 * Instantiate a ZooKeeper connection and watcher. 124 * @param conf the configuration to use 125 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 126 * this instance. Use null for default. 127 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 128 * context. 129 * @param canCreateBaseZNode true if a base ZNode can be created 130 * @throws IOException if the connection to ZooKeeper fails 131 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 132 */ 133 public ZKWatcher(Configuration conf, String identifier, 134 Abortable abortable, boolean canCreateBaseZNode) 135 throws IOException, ZooKeeperConnectionException { 136 this(conf, identifier, abortable, canCreateBaseZNode, false); 137 } 138 139 /** 140 * Instantiate a ZooKeeper connection and watcher. 141 * @param conf the configuration to use 142 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 143 * this instance. Use null for default. 144 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 145 * context. 146 * @param canCreateBaseZNode true if a base ZNode can be created 147 * @param clientZK whether this watcher is set to access client ZK 148 * @throws IOException if the connection to ZooKeeper fails 149 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 150 * ZNodes 151 */ 152 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 153 boolean canCreateBaseZNode, boolean clientZK) 154 throws IOException, ZooKeeperConnectionException { 155 this.conf = conf; 156 if (clientZK) { 157 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 158 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 159 if (clientZkQuorumServers != null) { 160 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 161 // Don't allow same settings to avoid dead loop when master trying 162 // to sync meta information from server ZK to client ZK 163 throw new IllegalArgumentException( 164 "The quorum settings for client ZK should be different from those for server"); 165 } 166 this.quorum = clientZkQuorumServers; 167 } else { 168 this.quorum = serverZkQuorumServers; 169 } 170 } else { 171 this.quorum = ZKConfig.getZKQuorumServersString(conf); 172 } 173 this.prefix = identifier; 174 // Identifier will get the sessionid appended later below down when we 175 // handle the syncconnect event. 176 this.identifier = identifier + "0x0"; 177 this.abortable = abortable; 178 this.znodePaths = new ZNodePaths(conf); 179 PendingWatcher pendingWatcher = new PendingWatcher(); 180 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); 181 pendingWatcher.prepare(this); 182 if (canCreateBaseZNode) { 183 try { 184 createBaseZNodes(); 185 } catch (ZooKeeperConnectionException zce) { 186 try { 187 this.recoverableZooKeeper.close(); 188 } catch (InterruptedException ie) { 189 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 190 Thread.currentThread().interrupt(); 191 } 192 throw zce; 193 } 194 } 195 this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, 196 HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); 197 } 198 199 private void createBaseZNodes() throws ZooKeeperConnectionException { 200 try { 201 // Create all the necessary "directories" of znodes 202 ZKUtil.createWithParents(this, znodePaths.baseZNode); 203 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 204 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 205 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 206 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 207 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 208 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 209 } catch (KeeperException e) { 210 throw new ZooKeeperConnectionException( 211 prefix("Unexpected KeeperException creating base node"), e); 212 } 213 } 214 215 /** 216 * On master start, we check the znode ACLs under the root directory and set the ACLs properly 217 * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed 218 * so that the existing znodes created with open permissions are now changed with restrictive 219 * perms. 220 */ 221 public void checkAndSetZNodeAcls() { 222 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) { 223 LOG.info("not a secure deployment, proceeding"); 224 return; 225 } 226 227 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 228 // correct. 229 try { 230 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 231 232 if (!isBaseZnodeAclSetup(actualAcls)) { 233 LOG.info("setting znode ACLs"); 234 setZnodeAclsRecursive(znodePaths.baseZNode); 235 } 236 } catch(KeeperException.NoNodeException nne) { 237 return; 238 } catch(InterruptedException ie) { 239 interruptedExceptionNoThrow(ie, false); 240 } catch (IOException|KeeperException e) { 241 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 242 } 243 } 244 245 /** 246 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs 247 * will be set last in case the master fails in between. 248 * @param znode the ZNode to set the permissions for 249 */ 250 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 251 List<String> children = recoverableZooKeeper.getChildren(znode, false); 252 253 for (String child : children) { 254 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 255 } 256 List<ACL> acls = ZKUtil.createACL(this, znode, true); 257 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 258 recoverableZooKeeper.setAcl(znode, acls, -1); 259 } 260 261 /** 262 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 263 * @param acls acls from zookeeper 264 * @return whether ACLs are set for the base znode 265 * @throws IOException if getting the current user fails 266 */ 267 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 268 if (LOG.isDebugEnabled()) { 269 LOG.debug("Checking znode ACLs"); 270 } 271 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 272 // Check whether ACL set for all superusers 273 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 274 return false; 275 } 276 277 // this assumes that current authenticated user is the same as zookeeper client user 278 // configured via JAAS 279 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 280 281 if (acls.isEmpty()) { 282 if (LOG.isDebugEnabled()) { 283 LOG.debug("ACL is empty"); 284 } 285 return false; 286 } 287 288 for (ACL acl : acls) { 289 int perms = acl.getPerms(); 290 Id id = acl.getId(); 291 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 292 // and one for the hbase user 293 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 294 if (perms != Perms.READ) { 295 if (LOG.isDebugEnabled()) { 296 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 297 id, perms, Perms.READ)); 298 } 299 return false; 300 } 301 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 302 if (perms != Perms.ALL) { 303 if (LOG.isDebugEnabled()) { 304 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 305 id, perms, Perms.ALL)); 306 } 307 return false; 308 } 309 } else if ("sasl".equals(id.getScheme())) { 310 String name = id.getId(); 311 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 312 Matcher match = NAME_PATTERN.matcher(name); 313 if (match.matches()) { 314 name = match.group(1); 315 } 316 if (name.equals(hbaseUser)) { 317 if (perms != Perms.ALL) { 318 if (LOG.isDebugEnabled()) { 319 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 320 id, perms, Perms.ALL)); 321 } 322 return false; 323 } 324 } else { 325 if (LOG.isDebugEnabled()) { 326 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 327 } 328 return false; 329 } 330 } else { 331 if (LOG.isDebugEnabled()) { 332 LOG.debug("unexpected ACL id '{}'", id); 333 } 334 return false; 335 } 336 } 337 return true; 338 } 339 340 /* 341 * Validate whether ACL set for all superusers. 342 */ 343 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 344 for (String user : superUsers) { 345 boolean hasAccess = false; 346 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 347 if (!AuthUtil.isGroupPrincipal(user)) { 348 for (ACL acl : acls) { 349 if (user.equals(acl.getId().getId())) { 350 if (acl.getPerms() == Perms.ALL) { 351 hasAccess = true; 352 } else { 353 if (LOG.isDebugEnabled()) { 354 LOG.debug(String.format( 355 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 356 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 357 } 358 } 359 break; 360 } 361 } 362 if (!hasAccess) { 363 return false; 364 } 365 } 366 } 367 return true; 368 } 369 370 /* 371 * Validate whether ACL ID is superuser. 372 */ 373 public static boolean isSuperUserId(String[] superUsers, Id id) { 374 for (String user : superUsers) { 375 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 376 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 377 return true; 378 } 379 } 380 return false; 381 } 382 383 @Override 384 public String toString() { 385 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 386 } 387 388 /** 389 * Adds this instance's identifier as a prefix to the passed <code>str</code> 390 * @param str String to amend. 391 * @return A new string with this instance's identifier as prefix: e.g. 392 * if passed 'hello world', the returned string could be 393 */ 394 public String prefix(final String str) { 395 return this.toString() + " " + str; 396 } 397 398 /** 399 * Get the znodes corresponding to the meta replicas from ZK 400 * @return list of znodes 401 * @throws KeeperException if a ZooKeeper operation fails 402 */ 403 public List<String> getMetaReplicaNodes() throws KeeperException { 404 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 405 return filterMetaReplicaNodes(childrenOfBaseNode); 406 } 407 408 /** 409 * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode 410 * for subsequent CREATE/DELETE operations on child nodes. 411 */ 412 public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException { 413 List<String> childrenOfBaseNode = 414 ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode); 415 return filterMetaReplicaNodes(childrenOfBaseNode); 416 } 417 418 /** 419 * @param nodes Input list of znodes 420 * @return Filtered list of znodes from nodes that belong to meta replica(s). 421 */ 422 private List<String> filterMetaReplicaNodes(List<String> nodes) { 423 if (nodes == null || nodes.isEmpty()) { 424 return new ArrayList<>(); 425 } 426 List<String> metaReplicaNodes = new ArrayList<>(2); 427 String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX); 428 for (String child : nodes) { 429 if (child.startsWith(pattern)) { 430 metaReplicaNodes.add(child); 431 } 432 } 433 return metaReplicaNodes; 434 } 435 436 /** 437 * Register the specified listener to receive ZooKeeper events. 438 * @param listener the listener to register 439 */ 440 public void registerListener(ZKListener listener) { 441 listeners.add(listener); 442 } 443 444 /** 445 * Register the specified listener to receive ZooKeeper events and add it as 446 * the first in the list of current listeners. 447 * @param listener the listener to register 448 */ 449 public void registerListenerFirst(ZKListener listener) { 450 listeners.add(0, listener); 451 } 452 453 public void unregisterListener(ZKListener listener) { 454 listeners.remove(listener); 455 } 456 457 /** 458 * Clean all existing listeners 459 */ 460 public void unregisterAllListeners() { 461 listeners.clear(); 462 } 463 464 /** 465 * Get a copy of current registered listeners 466 */ 467 public List<ZKListener> getListeners() { 468 return new ArrayList<>(listeners); 469 } 470 471 /** 472 * @return The number of currently registered listeners 473 */ 474 public int getNumberOfListeners() { 475 return listeners.size(); 476 } 477 478 /** 479 * Get the connection to ZooKeeper. 480 * @return connection reference to zookeeper 481 */ 482 public RecoverableZooKeeper getRecoverableZooKeeper() { 483 return recoverableZooKeeper; 484 } 485 486 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 487 recoverableZooKeeper.reconnectAfterExpiration(); 488 } 489 490 /** 491 * Get the quorum address of this instance. 492 * @return quorum string of this zookeeper connection instance 493 */ 494 public String getQuorum() { 495 return quorum; 496 } 497 498 /** 499 * Get the znodePaths. 500 * <p> 501 * Mainly used for mocking as mockito can not mock a field access. 502 */ 503 public ZNodePaths getZNodePaths() { 504 return znodePaths; 505 } 506 507 private void processEvent(WatchedEvent event) { 508 switch(event.getType()) { 509 // If event type is NONE, this is a connection status change 510 case None: { 511 connectionEvent(event); 512 break; 513 } 514 515 // Otherwise pass along to the listeners 516 case NodeCreated: { 517 for(ZKListener listener : listeners) { 518 listener.nodeCreated(event.getPath()); 519 } 520 break; 521 } 522 523 case NodeDeleted: { 524 for(ZKListener listener : listeners) { 525 listener.nodeDeleted(event.getPath()); 526 } 527 break; 528 } 529 530 case NodeDataChanged: { 531 for(ZKListener listener : listeners) { 532 listener.nodeDataChanged(event.getPath()); 533 } 534 break; 535 } 536 537 case NodeChildrenChanged: { 538 for(ZKListener listener : listeners) { 539 listener.nodeChildrenChanged(event.getPath()); 540 } 541 break; 542 } 543 default: 544 LOG.error("Invalid event of type {} received for path {}. Ignoring.", 545 event.getState(), event.getPath()); 546 } 547 } 548 549 /** 550 * Method called from ZooKeeper for events and connection status. 551 * <p> 552 * Valid events are passed along to listeners. Connection status changes 553 * are dealt with locally. 554 */ 555 @Override 556 public void process(WatchedEvent event) { 557 LOG.debug(prefix("Received ZooKeeper Event, " + 558 "type=" + event.getType() + ", " + 559 "state=" + event.getState() + ", " + 560 "path=" + event.getPath())); 561 zkEventProcessor.submit(() -> processEvent(event)); 562 } 563 564 // Connection management 565 566 /** 567 * Called when there is a connection-related event via the Watcher callback. 568 * <p> 569 * If Disconnected or Expired, this should shutdown the cluster. But, since 570 * we send a KeeperException.SessionExpiredException along with the abort 571 * call, it's possible for the Abortable to catch it and try to create a new 572 * session with ZooKeeper. This is what the client does in HCM. 573 * <p> 574 * @param event the connection-related event 575 */ 576 private void connectionEvent(WatchedEvent event) { 577 switch(event.getState()) { 578 case SyncConnected: 579 this.identifier = this.prefix + "-0x" + 580 Long.toHexString(this.recoverableZooKeeper.getSessionId()); 581 // Update our identifier. Otherwise ignore. 582 LOG.debug("{} connected", this.identifier); 583 break; 584 585 // Abort the server if Disconnected or Expired 586 case Disconnected: 587 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 588 break; 589 590 case Closed: 591 LOG.debug(prefix("ZooKeeper client closed, ignoring")); 592 break; 593 594 case Expired: 595 String msg = prefix(this.identifier + " received expired from " + 596 "ZooKeeper, aborting"); 597 // TODO: One thought is to add call to ZKListener so say, 598 // ZKNodeTracker can zero out its data values. 599 if (this.abortable != null) { 600 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 601 } 602 break; 603 604 case ConnectedReadOnly: 605 case SaslAuthenticated: 606 case AuthFailed: 607 break; 608 609 default: 610 throw new IllegalStateException("Received event is not valid: " + event.getState()); 611 } 612 } 613 614 /** 615 * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a 616 * timeout lets the callers fail-fast rather than wait forever for the sync to finish. 617 * <p> 618 * Executing this method before running other methods will ensure that the 619 * subsequent operations are up-to-date and consistent as of the time that 620 * the sync is complete. 621 * <p> 622 * This is used for compareAndSwap type operations where we need to read the 623 * data of an existing node and delete or transition that node, utilizing the 624 * previously read version and data. We want to ensure that the version read 625 * is up-to-date from when we begin the operation. 626 * <p> 627 */ 628 public void syncOrTimeout(String path) throws KeeperException { 629 final CountDownLatch latch = new CountDownLatch(1); 630 long startTime = EnvironmentEdgeManager.currentTime(); 631 this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); 632 try { 633 if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { 634 LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " 635 + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); 636 throw new KeeperException.RequestTimeoutException(); 637 } 638 } catch (InterruptedException e) { 639 LOG.warn("Interrupted waiting for ZK sync() to finish.", e); 640 Thread.currentThread().interrupt(); 641 return; 642 } 643 if (LOG.isDebugEnabled()) { 644 // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a 645 // useful metric to have since the latency of sync() impacts the callers. 646 LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); 647 } 648 } 649 650 /** 651 * Handles KeeperExceptions in client calls. 652 * <p> 653 * This may be temporary but for now this gives one place to deal with these. 654 * <p> 655 * TODO: Currently this method rethrows the exception to let the caller handle 656 * <p> 657 * @param ke the exception to rethrow 658 * @throws KeeperException if a ZooKeeper operation fails 659 */ 660 public void keeperException(KeeperException ke) throws KeeperException { 661 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 662 throw ke; 663 } 664 665 /** 666 * Handles InterruptedExceptions in client calls. 667 * @param ie the InterruptedException instance thrown 668 * @throws KeeperException the exception to throw, transformed from the InterruptedException 669 */ 670 public void interruptedException(InterruptedException ie) throws KeeperException { 671 interruptedExceptionNoThrow(ie, true); 672 // Throw a system error exception to let upper level handle it 673 KeeperException keeperException = new KeeperException.SystemErrorException(); 674 keeperException.initCause(ie); 675 throw keeperException; 676 } 677 678 /** 679 * Log the InterruptedException and interrupt current thread 680 * @param ie The IterruptedException to log 681 * @param throwLater Whether we will throw the exception latter 682 */ 683 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 684 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 685 + (throwLater ? " and rethrow a SystemErrorException" : "")), 686 ie); 687 // At least preserve interrupt. 688 Thread.currentThread().interrupt(); 689 } 690 691 /** 692 * Close the connection to ZooKeeper. 693 * 694 */ 695 @Override 696 public void close() { 697 try { 698 recoverableZooKeeper.close(); 699 } catch (InterruptedException e) { 700 Thread.currentThread().interrupt(); 701 } finally { 702 zkEventProcessor.shutdownNow(); 703 } 704 } 705 706 public Configuration getConfiguration() { 707 return conf; 708 } 709 710 @Override 711 public void abort(String why, Throwable e) { 712 if (this.abortable != null) { 713 this.abortable.abort(why, e); 714 } else { 715 this.aborted = true; 716 } 717 } 718 719 @Override 720 public boolean isAborted() { 721 return this.abortable == null? this.aborted: this.abortable.isAborted(); 722 } 723}