001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.TimeUnit; 030import java.util.regex.Matcher; 031import java.util.regex.Pattern; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.AuthUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ZooKeeperConnectionException; 037import org.apache.hadoop.hbase.security.Superusers; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.zookeeper.KeeperException; 043import org.apache.zookeeper.WatchedEvent; 044import org.apache.zookeeper.Watcher; 045import org.apache.zookeeper.ZooDefs.Ids; 046import org.apache.zookeeper.ZooDefs.Perms; 047import org.apache.zookeeper.data.ACL; 048import org.apache.zookeeper.data.Id; 049import org.apache.zookeeper.data.Stat; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated 055 * for each Master, RegionServer, and client process. 056 * 057 * <p>This is the only class that implements {@link Watcher}. Other internal 058 * classes which need to be notified of ZooKeeper events must register with 059 * the local instance of this watcher via {@link #registerListener}. 060 * 061 * <p>This class also holds and manages the connection to ZooKeeper. Code to 062 * deal with connection related events and exceptions are handled here. 063 */ 064@InterfaceAudience.Private 065public class ZKWatcher implements Watcher, Abortable, Closeable { 066 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 067 068 // Identifier for this watcher (for logging only). It is made of the prefix 069 // passed on construction and the zookeeper sessionid. 070 private String prefix; 071 private String identifier; 072 073 // zookeeper quorum 074 private String quorum; 075 076 // zookeeper connection 077 private final RecoverableZooKeeper recoverableZooKeeper; 078 079 // abortable in case of zk failure 080 protected Abortable abortable; 081 // Used if abortable is null 082 private boolean aborted = false; 083 084 private final ZNodePaths znodePaths; 085 086 // listeners to be notified 087 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 088 089 // Single threaded executor pool that processes event notifications from Zookeeper. Events are 090 // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do 091 // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. 092 // EventThread internally runs a single while loop to serially process all the events. When events 093 // are processed by the listeners in the same thread, that blocks the EventThread from processing 094 // subsequent events. Processing events in a separate thread frees up the event thread to continue 095 // and further prevents deadlocks if the process method itself makes other zookeeper calls. 096 // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the 097 // requests using a single while loop and hence there is no performance degradation. 098 private final ExecutorService zkEventProcessor = 099 Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor")); 100 101 private final Configuration conf; 102 103 private final long zkSyncTimeout; 104 105 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 106 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 107 108 /** 109 * Instantiate a ZooKeeper connection and watcher. 110 * @param identifier string that is passed to RecoverableZookeeper to be used as 111 * identifier for this instance. Use null for default. 112 * @throws IOException if the connection to ZooKeeper fails 113 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 114 */ 115 public ZKWatcher(Configuration conf, String identifier, 116 Abortable abortable) throws ZooKeeperConnectionException, IOException { 117 this(conf, identifier, abortable, false); 118 } 119 120 /** 121 * Instantiate a ZooKeeper connection and watcher. 122 * @param conf the configuration to use 123 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 124 * this instance. Use null for default. 125 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 126 * context. 127 * @param canCreateBaseZNode true if a base ZNode can be created 128 * @throws IOException if the connection to ZooKeeper fails 129 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 130 */ 131 public ZKWatcher(Configuration conf, String identifier, 132 Abortable abortable, boolean canCreateBaseZNode) 133 throws IOException, ZooKeeperConnectionException { 134 this(conf, identifier, abortable, canCreateBaseZNode, false); 135 } 136 137 /** 138 * Instantiate a ZooKeeper connection and watcher. 139 * @param conf the configuration to use 140 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 141 * this instance. Use null for default. 142 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 143 * context. 144 * @param canCreateBaseZNode true if a base ZNode can be created 145 * @param clientZK whether this watcher is set to access client ZK 146 * @throws IOException if the connection to ZooKeeper fails 147 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 148 * ZNodes 149 */ 150 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 151 boolean canCreateBaseZNode, boolean clientZK) 152 throws IOException, ZooKeeperConnectionException { 153 this.conf = conf; 154 if (clientZK) { 155 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 156 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 157 if (clientZkQuorumServers != null) { 158 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 159 // Don't allow same settings to avoid dead loop when master trying 160 // to sync meta information from server ZK to client ZK 161 throw new IllegalArgumentException( 162 "The quorum settings for client ZK should be different from those for server"); 163 } 164 this.quorum = clientZkQuorumServers; 165 } else { 166 this.quorum = serverZkQuorumServers; 167 } 168 } else { 169 this.quorum = ZKConfig.getZKQuorumServersString(conf); 170 } 171 this.prefix = identifier; 172 // Identifier will get the sessionid appended later below down when we 173 // handle the syncconnect event. 174 this.identifier = identifier + "0x0"; 175 this.abortable = abortable; 176 this.znodePaths = new ZNodePaths(conf); 177 PendingWatcher pendingWatcher = new PendingWatcher(); 178 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); 179 pendingWatcher.prepare(this); 180 if (canCreateBaseZNode) { 181 try { 182 createBaseZNodes(); 183 } catch (ZooKeeperConnectionException zce) { 184 try { 185 this.recoverableZooKeeper.close(); 186 } catch (InterruptedException ie) { 187 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 188 Thread.currentThread().interrupt(); 189 } 190 throw zce; 191 } 192 } 193 this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, 194 HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); 195 } 196 197 private void createBaseZNodes() throws ZooKeeperConnectionException { 198 try { 199 // Create all the necessary "directories" of znodes 200 ZKUtil.createWithParents(this, znodePaths.baseZNode); 201 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 202 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 203 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 204 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 205 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 206 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 207 } catch (KeeperException e) { 208 throw new ZooKeeperConnectionException( 209 prefix("Unexpected KeeperException creating base node"), e); 210 } 211 } 212 213 /** 214 * On master start, we check the znode ACLs under the root directory and set the ACLs properly 215 * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed 216 * so that the existing znodes created with open permissions are now changed with restrictive 217 * perms. 218 */ 219 public void checkAndSetZNodeAcls() { 220 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) { 221 LOG.info("not a secure deployment, proceeding"); 222 return; 223 } 224 225 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 226 // correct. 227 try { 228 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 229 230 if (!isBaseZnodeAclSetup(actualAcls)) { 231 LOG.info("setting znode ACLs"); 232 setZnodeAclsRecursive(znodePaths.baseZNode); 233 } 234 } catch(KeeperException.NoNodeException nne) { 235 return; 236 } catch(InterruptedException ie) { 237 interruptedExceptionNoThrow(ie, false); 238 } catch (IOException|KeeperException e) { 239 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 240 } 241 } 242 243 /** 244 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs 245 * will be set last in case the master fails in between. 246 * @param znode the ZNode to set the permissions for 247 */ 248 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 249 List<String> children = recoverableZooKeeper.getChildren(znode, false); 250 251 for (String child : children) { 252 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 253 } 254 List<ACL> acls = ZKUtil.createACL(this, znode, true); 255 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 256 recoverableZooKeeper.setAcl(znode, acls, -1); 257 } 258 259 /** 260 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 261 * @param acls acls from zookeeper 262 * @return whether ACLs are set for the base znode 263 * @throws IOException if getting the current user fails 264 */ 265 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 266 if (LOG.isDebugEnabled()) { 267 LOG.debug("Checking znode ACLs"); 268 } 269 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 270 // Check whether ACL set for all superusers 271 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 272 return false; 273 } 274 275 // this assumes that current authenticated user is the same as zookeeper client user 276 // configured via JAAS 277 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 278 279 if (acls.isEmpty()) { 280 if (LOG.isDebugEnabled()) { 281 LOG.debug("ACL is empty"); 282 } 283 return false; 284 } 285 286 for (ACL acl : acls) { 287 int perms = acl.getPerms(); 288 Id id = acl.getId(); 289 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 290 // and one for the hbase user 291 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 292 if (perms != Perms.READ) { 293 if (LOG.isDebugEnabled()) { 294 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 295 id, perms, Perms.READ)); 296 } 297 return false; 298 } 299 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 300 if (perms != Perms.ALL) { 301 if (LOG.isDebugEnabled()) { 302 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 303 id, perms, Perms.ALL)); 304 } 305 return false; 306 } 307 } else if ("sasl".equals(id.getScheme())) { 308 String name = id.getId(); 309 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 310 Matcher match = NAME_PATTERN.matcher(name); 311 if (match.matches()) { 312 name = match.group(1); 313 } 314 if (name.equals(hbaseUser)) { 315 if (perms != Perms.ALL) { 316 if (LOG.isDebugEnabled()) { 317 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 318 id, perms, Perms.ALL)); 319 } 320 return false; 321 } 322 } else { 323 if (LOG.isDebugEnabled()) { 324 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 325 } 326 return false; 327 } 328 } else { 329 if (LOG.isDebugEnabled()) { 330 LOG.debug("unexpected ACL id '{}'", id); 331 } 332 return false; 333 } 334 } 335 return true; 336 } 337 338 /* 339 * Validate whether ACL set for all superusers. 340 */ 341 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 342 for (String user : superUsers) { 343 boolean hasAccess = false; 344 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 345 if (!AuthUtil.isGroupPrincipal(user)) { 346 for (ACL acl : acls) { 347 if (user.equals(acl.getId().getId())) { 348 if (acl.getPerms() == Perms.ALL) { 349 hasAccess = true; 350 } else { 351 if (LOG.isDebugEnabled()) { 352 LOG.debug(String.format( 353 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 354 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 355 } 356 } 357 break; 358 } 359 } 360 if (!hasAccess) { 361 return false; 362 } 363 } 364 } 365 return true; 366 } 367 368 /* 369 * Validate whether ACL ID is superuser. 370 */ 371 public static boolean isSuperUserId(String[] superUsers, Id id) { 372 for (String user : superUsers) { 373 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 374 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 375 return true; 376 } 377 } 378 return false; 379 } 380 381 @Override 382 public String toString() { 383 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 384 } 385 386 /** 387 * Adds this instance's identifier as a prefix to the passed <code>str</code> 388 * @param str String to amend. 389 * @return A new string with this instance's identifier as prefix: e.g. 390 * if passed 'hello world', the returned string could be 391 */ 392 public String prefix(final String str) { 393 return this.toString() + " " + str; 394 } 395 396 /** 397 * Get the znodes corresponding to the meta replicas from ZK 398 * @return list of znodes 399 * @throws KeeperException if a ZooKeeper operation fails 400 */ 401 public List<String> getMetaReplicaNodes() throws KeeperException { 402 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 403 return filterMetaReplicaNodes(childrenOfBaseNode); 404 } 405 406 /** 407 * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode 408 * for subsequent CREATE/DELETE operations on child nodes. 409 */ 410 public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException { 411 List<String> childrenOfBaseNode = 412 ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode); 413 return filterMetaReplicaNodes(childrenOfBaseNode); 414 } 415 416 /** 417 * @param nodes Input list of znodes 418 * @return Filtered list of znodes from nodes that belong to meta replica(s). 419 */ 420 private List<String> filterMetaReplicaNodes(List<String> nodes) { 421 if (nodes == null || nodes.isEmpty()) { 422 return new ArrayList<>(); 423 } 424 List<String> metaReplicaNodes = new ArrayList<>(2); 425 String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX); 426 for (String child : nodes) { 427 if (child.startsWith(pattern)) { 428 metaReplicaNodes.add(child); 429 } 430 } 431 return metaReplicaNodes; 432 } 433 434 /** 435 * Register the specified listener to receive ZooKeeper events. 436 * @param listener the listener to register 437 */ 438 public void registerListener(ZKListener listener) { 439 listeners.add(listener); 440 } 441 442 /** 443 * Register the specified listener to receive ZooKeeper events and add it as 444 * the first in the list of current listeners. 445 * @param listener the listener to register 446 */ 447 public void registerListenerFirst(ZKListener listener) { 448 listeners.add(0, listener); 449 } 450 451 public void unregisterListener(ZKListener listener) { 452 listeners.remove(listener); 453 } 454 455 /** 456 * Clean all existing listeners 457 */ 458 public void unregisterAllListeners() { 459 listeners.clear(); 460 } 461 462 /** 463 * Get a copy of current registered listeners 464 */ 465 public List<ZKListener> getListeners() { 466 return new ArrayList<>(listeners); 467 } 468 469 /** 470 * @return The number of currently registered listeners 471 */ 472 public int getNumberOfListeners() { 473 return listeners.size(); 474 } 475 476 /** 477 * Get the connection to ZooKeeper. 478 * @return connection reference to zookeeper 479 */ 480 public RecoverableZooKeeper getRecoverableZooKeeper() { 481 return recoverableZooKeeper; 482 } 483 484 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 485 recoverableZooKeeper.reconnectAfterExpiration(); 486 } 487 488 /** 489 * Get the quorum address of this instance. 490 * @return quorum string of this zookeeper connection instance 491 */ 492 public String getQuorum() { 493 return quorum; 494 } 495 496 /** 497 * Get the znodePaths. 498 * <p> 499 * Mainly used for mocking as mockito can not mock a field access. 500 */ 501 public ZNodePaths getZNodePaths() { 502 return znodePaths; 503 } 504 505 private void processEvent(WatchedEvent event) { 506 switch(event.getType()) { 507 // If event type is NONE, this is a connection status change 508 case None: { 509 connectionEvent(event); 510 break; 511 } 512 513 // Otherwise pass along to the listeners 514 case NodeCreated: { 515 for(ZKListener listener : listeners) { 516 listener.nodeCreated(event.getPath()); 517 } 518 break; 519 } 520 521 case NodeDeleted: { 522 for(ZKListener listener : listeners) { 523 listener.nodeDeleted(event.getPath()); 524 } 525 break; 526 } 527 528 case NodeDataChanged: { 529 for(ZKListener listener : listeners) { 530 listener.nodeDataChanged(event.getPath()); 531 } 532 break; 533 } 534 535 case NodeChildrenChanged: { 536 for(ZKListener listener : listeners) { 537 listener.nodeChildrenChanged(event.getPath()); 538 } 539 break; 540 } 541 default: 542 LOG.error("Invalid event of type {} received for path {}. Ignoring.", 543 event.getState(), event.getPath()); 544 } 545 } 546 547 /** 548 * Method called from ZooKeeper for events and connection status. 549 * <p> 550 * Valid events are passed along to listeners. Connection status changes 551 * are dealt with locally. 552 */ 553 @Override 554 public void process(WatchedEvent event) { 555 LOG.debug(prefix("Received ZooKeeper Event, " + 556 "type=" + event.getType() + ", " + 557 "state=" + event.getState() + ", " + 558 "path=" + event.getPath())); 559 zkEventProcessor.submit(() -> processEvent(event)); 560 } 561 562 // Connection management 563 564 /** 565 * Called when there is a connection-related event via the Watcher callback. 566 * <p> 567 * If Disconnected or Expired, this should shutdown the cluster. But, since 568 * we send a KeeperException.SessionExpiredException along with the abort 569 * call, it's possible for the Abortable to catch it and try to create a new 570 * session with ZooKeeper. This is what the client does in HCM. 571 * <p> 572 * @param event the connection-related event 573 */ 574 private void connectionEvent(WatchedEvent event) { 575 switch(event.getState()) { 576 case SyncConnected: 577 this.identifier = this.prefix + "-0x" + 578 Long.toHexString(this.recoverableZooKeeper.getSessionId()); 579 // Update our identifier. Otherwise ignore. 580 LOG.debug("{} connected", this.identifier); 581 break; 582 583 // Abort the server if Disconnected or Expired 584 case Disconnected: 585 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 586 break; 587 588 case Closed: 589 LOG.debug(prefix("ZooKeeper client closed, ignoring")); 590 break; 591 592 case Expired: 593 String msg = prefix(this.identifier + " received expired from " + 594 "ZooKeeper, aborting"); 595 // TODO: One thought is to add call to ZKListener so say, 596 // ZKNodeTracker can zero out its data values. 597 if (this.abortable != null) { 598 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 599 } 600 break; 601 602 case ConnectedReadOnly: 603 case SaslAuthenticated: 604 case AuthFailed: 605 break; 606 607 default: 608 throw new IllegalStateException("Received event is not valid: " + event.getState()); 609 } 610 } 611 612 /** 613 * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a 614 * timeout lets the callers fail-fast rather than wait forever for the sync to finish. 615 * <p> 616 * Executing this method before running other methods will ensure that the 617 * subsequent operations are up-to-date and consistent as of the time that 618 * the sync is complete. 619 * <p> 620 * This is used for compareAndSwap type operations where we need to read the 621 * data of an existing node and delete or transition that node, utilizing the 622 * previously read version and data. We want to ensure that the version read 623 * is up-to-date from when we begin the operation. 624 * <p> 625 */ 626 public void syncOrTimeout(String path) throws KeeperException { 627 final CountDownLatch latch = new CountDownLatch(1); 628 long startTime = EnvironmentEdgeManager.currentTime(); 629 this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); 630 try { 631 if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { 632 LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " 633 + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); 634 throw new KeeperException.RequestTimeoutException(); 635 } 636 } catch (InterruptedException e) { 637 LOG.warn("Interrupted waiting for ZK sync() to finish.", e); 638 Thread.currentThread().interrupt(); 639 return; 640 } 641 if (LOG.isDebugEnabled()) { 642 // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a 643 // useful metric to have since the latency of sync() impacts the callers. 644 LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); 645 } 646 } 647 648 /** 649 * Handles KeeperExceptions in client calls. 650 * <p> 651 * This may be temporary but for now this gives one place to deal with these. 652 * <p> 653 * TODO: Currently this method rethrows the exception to let the caller handle 654 * <p> 655 * @param ke the exception to rethrow 656 * @throws KeeperException if a ZooKeeper operation fails 657 */ 658 public void keeperException(KeeperException ke) throws KeeperException { 659 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 660 throw ke; 661 } 662 663 /** 664 * Handles InterruptedExceptions in client calls. 665 * @param ie the InterruptedException instance thrown 666 * @throws KeeperException the exception to throw, transformed from the InterruptedException 667 */ 668 public void interruptedException(InterruptedException ie) throws KeeperException { 669 interruptedExceptionNoThrow(ie, true); 670 // Throw a system error exception to let upper level handle it 671 KeeperException keeperException = new KeeperException.SystemErrorException(); 672 keeperException.initCause(ie); 673 throw keeperException; 674 } 675 676 /** 677 * Log the InterruptedException and interrupt current thread 678 * @param ie The IterruptedException to log 679 * @param throwLater Whether we will throw the exception latter 680 */ 681 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 682 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 683 + (throwLater ? " and rethrow a SystemErrorException" : "")), 684 ie); 685 // At least preserve interrupt. 686 Thread.currentThread().interrupt(); 687 } 688 689 /** 690 * Close the connection to ZooKeeper. 691 * 692 */ 693 @Override 694 public void close() { 695 zkEventProcessor.shutdownNow(); 696 try { 697 recoverableZooKeeper.close(); 698 } catch (InterruptedException e) { 699 Thread.currentThread().interrupt(); 700 } 701 } 702 703 public Configuration getConfiguration() { 704 return conf; 705 } 706 707 @Override 708 public void abort(String why, Throwable e) { 709 if (this.abortable != null) { 710 this.abortable.abort(why, e); 711 } else { 712 this.aborted = true; 713 } 714 } 715 716 @Override 717 public boolean isAborted() { 718 return this.abortable == null? this.aborted: this.abortable.isAborted(); 719 } 720}