001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.TimeUnit;
030import java.util.regex.Matcher;
031import java.util.regex.Pattern;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.AuthUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ZooKeeperConnectionException;
037import org.apache.hadoop.hbase.security.Superusers;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.zookeeper.KeeperException;
043import org.apache.zookeeper.WatchedEvent;
044import org.apache.zookeeper.Watcher;
045import org.apache.zookeeper.ZooDefs.Ids;
046import org.apache.zookeeper.ZooDefs.Perms;
047import org.apache.zookeeper.data.ACL;
048import org.apache.zookeeper.data.Id;
049import org.apache.zookeeper.data.Stat;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
055 * for each Master, RegionServer, and client process.
056 *
057 * <p>This is the only class that implements {@link Watcher}.  Other internal
058 * classes which need to be notified of ZooKeeper events must register with
059 * the local instance of this watcher via {@link #registerListener}.
060 *
061 * <p>This class also holds and manages the connection to ZooKeeper.  Code to
062 * deal with connection related events and exceptions are handled here.
063 */
064@InterfaceAudience.Private
065public class ZKWatcher implements Watcher, Abortable, Closeable {
066  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
067
068  // Identifier for this watcher (for logging only).  It is made of the prefix
069  // passed on construction and the zookeeper sessionid.
070  private String prefix;
071  private String identifier;
072
073  // zookeeper quorum
074  private String quorum;
075
076  // zookeeper connection
077  private final RecoverableZooKeeper recoverableZooKeeper;
078
079  // abortable in case of zk failure
080  protected Abortable abortable;
081  // Used if abortable is null
082  private boolean aborted = false;
083
084  private final ZNodePaths znodePaths;
085
086  // listeners to be notified
087  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
088
089  // Single threaded executor pool that processes event notifications from Zookeeper. Events are
090  // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do
091  // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context.
092  // EventThread internally runs a single while loop to serially process all the events. When events
093  // are processed by the listeners in the same thread, that blocks the EventThread from processing
094  // subsequent events. Processing events in a separate thread frees up the event thread to continue
095  // and further prevents deadlocks if the process method itself makes other zookeeper calls.
096  // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
097  // requests using a single while loop and hence there is no performance degradation.
098  private final ExecutorService zkEventProcessor =
099      Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor"));
100
101  private final Configuration conf;
102
103  private final long zkSyncTimeout;
104
105  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
106  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
107
108  /**
109   * Instantiate a ZooKeeper connection and watcher.
110   * @param identifier string that is passed to RecoverableZookeeper to be used as
111   *                   identifier for this instance. Use null for default.
112   * @throws IOException if the connection to ZooKeeper fails
113   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
114   */
115  public ZKWatcher(Configuration conf, String identifier,
116                   Abortable abortable) throws ZooKeeperConnectionException, IOException {
117    this(conf, identifier, abortable, false);
118  }
119
120  /**
121   * Instantiate a ZooKeeper connection and watcher.
122   * @param conf the configuration to use
123   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
124   *          this instance. Use null for default.
125   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
126   *          context.
127   * @param canCreateBaseZNode true if a base ZNode can be created
128   * @throws IOException if the connection to ZooKeeper fails
129   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
130   */
131  public ZKWatcher(Configuration conf, String identifier,
132                   Abortable abortable, boolean canCreateBaseZNode)
133    throws IOException, ZooKeeperConnectionException {
134    this(conf, identifier, abortable, canCreateBaseZNode, false);
135  }
136
137  /**
138   * Instantiate a ZooKeeper connection and watcher.
139   * @param conf the configuration to use
140   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
141   *          this instance. Use null for default.
142   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
143   *          context.
144   * @param canCreateBaseZNode true if a base ZNode can be created
145   * @param clientZK whether this watcher is set to access client ZK
146   * @throws IOException if the connection to ZooKeeper fails
147   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
148   *           ZNodes
149   */
150  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
151      boolean canCreateBaseZNode, boolean clientZK)
152      throws IOException, ZooKeeperConnectionException {
153    this.conf = conf;
154    if (clientZK) {
155      String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
156      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
157      if (clientZkQuorumServers != null) {
158        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
159          // Don't allow same settings to avoid dead loop when master trying
160          // to sync meta information from server ZK to client ZK
161          throw new IllegalArgumentException(
162              "The quorum settings for client ZK should be different from those for server");
163        }
164        this.quorum = clientZkQuorumServers;
165      } else {
166        this.quorum = serverZkQuorumServers;
167      }
168    } else {
169      this.quorum = ZKConfig.getZKQuorumServersString(conf);
170    }
171    this.prefix = identifier;
172    // Identifier will get the sessionid appended later below down when we
173    // handle the syncconnect event.
174    this.identifier = identifier + "0x0";
175    this.abortable = abortable;
176    this.znodePaths = new ZNodePaths(conf);
177    PendingWatcher pendingWatcher = new PendingWatcher();
178    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
179    pendingWatcher.prepare(this);
180    if (canCreateBaseZNode) {
181      try {
182        createBaseZNodes();
183      } catch (ZooKeeperConnectionException zce) {
184        try {
185          this.recoverableZooKeeper.close();
186        } catch (InterruptedException ie) {
187          LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper);
188          Thread.currentThread().interrupt();
189        }
190        throw zce;
191      }
192    }
193    this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS,
194        HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
195  }
196
197  private void createBaseZNodes() throws ZooKeeperConnectionException {
198    try {
199      // Create all the necessary "directories" of znodes
200      ZKUtil.createWithParents(this, znodePaths.baseZNode);
201      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
202      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
203      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
204      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
205      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
206      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
207    } catch (KeeperException e) {
208      throw new ZooKeeperConnectionException(
209          prefix("Unexpected KeeperException creating base node"), e);
210    }
211  }
212
213  /**
214   * On master start, we check the znode ACLs under the root directory and set the ACLs properly
215   * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
216   * so that the existing znodes created with open permissions are now changed with restrictive
217   * perms.
218   */
219  public void checkAndSetZNodeAcls() {
220    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
221      LOG.info("not a secure deployment, proceeding");
222      return;
223    }
224
225    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
226    // correct.
227    try {
228      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
229
230      if (!isBaseZnodeAclSetup(actualAcls)) {
231        LOG.info("setting znode ACLs");
232        setZnodeAclsRecursive(znodePaths.baseZNode);
233      }
234    } catch(KeeperException.NoNodeException nne) {
235      return;
236    } catch(InterruptedException ie) {
237      interruptedExceptionNoThrow(ie, false);
238    } catch (IOException|KeeperException e) {
239      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
240    }
241  }
242
243  /**
244   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
245   * will be set last in case the master fails in between.
246   * @param znode the ZNode to set the permissions for
247   */
248  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
249    List<String> children = recoverableZooKeeper.getChildren(znode, false);
250
251    for (String child : children) {
252      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
253    }
254    List<ACL> acls = ZKUtil.createACL(this, znode, true);
255    LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
256    recoverableZooKeeper.setAcl(znode, acls, -1);
257  }
258
259  /**
260   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
261   * @param acls acls from zookeeper
262   * @return whether ACLs are set for the base znode
263   * @throws IOException if getting the current user fails
264   */
265  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
266    if (LOG.isDebugEnabled()) {
267      LOG.debug("Checking znode ACLs");
268    }
269    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
270    // Check whether ACL set for all superusers
271    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
272      return false;
273    }
274
275    // this assumes that current authenticated user is the same as zookeeper client user
276    // configured via JAAS
277    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
278
279    if (acls.isEmpty()) {
280      if (LOG.isDebugEnabled()) {
281        LOG.debug("ACL is empty");
282      }
283      return false;
284    }
285
286    for (ACL acl : acls) {
287      int perms = acl.getPerms();
288      Id id = acl.getId();
289      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
290      // and one for the hbase user
291      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
292        if (perms != Perms.READ) {
293          if (LOG.isDebugEnabled()) {
294            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
295              id, perms, Perms.READ));
296          }
297          return false;
298        }
299      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
300        if (perms != Perms.ALL) {
301          if (LOG.isDebugEnabled()) {
302            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
303              id, perms, Perms.ALL));
304          }
305          return false;
306        }
307      } else if ("sasl".equals(id.getScheme())) {
308        String name = id.getId();
309        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
310        Matcher match = NAME_PATTERN.matcher(name);
311        if (match.matches()) {
312          name = match.group(1);
313        }
314        if (name.equals(hbaseUser)) {
315          if (perms != Perms.ALL) {
316            if (LOG.isDebugEnabled()) {
317              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
318                id, perms, Perms.ALL));
319            }
320            return false;
321          }
322        } else {
323          if (LOG.isDebugEnabled()) {
324            LOG.debug("Unexpected shortname in SASL ACL: {}", id);
325          }
326          return false;
327        }
328      } else {
329        if (LOG.isDebugEnabled()) {
330          LOG.debug("unexpected ACL id '{}'", id);
331        }
332        return false;
333      }
334    }
335    return true;
336  }
337
338  /*
339   * Validate whether ACL set for all superusers.
340   */
341  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
342    for (String user : superUsers) {
343      boolean hasAccess = false;
344      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
345      if (!AuthUtil.isGroupPrincipal(user)) {
346        for (ACL acl : acls) {
347          if (user.equals(acl.getId().getId())) {
348            if (acl.getPerms() == Perms.ALL) {
349              hasAccess = true;
350            } else {
351              if (LOG.isDebugEnabled()) {
352                LOG.debug(String.format(
353                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
354                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
355              }
356            }
357            break;
358          }
359        }
360        if (!hasAccess) {
361          return false;
362        }
363      }
364    }
365    return true;
366  }
367
368  /*
369   * Validate whether ACL ID is superuser.
370   */
371  public static boolean isSuperUserId(String[] superUsers, Id id) {
372    for (String user : superUsers) {
373      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
374      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
375        return true;
376      }
377    }
378    return false;
379  }
380
381  @Override
382  public String toString() {
383    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
384  }
385
386  /**
387   * Adds this instance's identifier as a prefix to the passed <code>str</code>
388   * @param str String to amend.
389   * @return A new string with this instance's identifier as prefix: e.g.
390   *         if passed 'hello world', the returned string could be
391   */
392  public String prefix(final String str) {
393    return this.toString() + " " + str;
394  }
395
396  /**
397   * Get the znodes corresponding to the meta replicas from ZK
398   * @return list of znodes
399   * @throws KeeperException if a ZooKeeper operation fails
400   */
401  public List<String> getMetaReplicaNodes() throws KeeperException {
402    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
403    return filterMetaReplicaNodes(childrenOfBaseNode);
404  }
405
406  /**
407   * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode
408   * for subsequent CREATE/DELETE operations on child nodes.
409   */
410  public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException {
411    List<String> childrenOfBaseNode =
412        ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode);
413    return filterMetaReplicaNodes(childrenOfBaseNode);
414  }
415
416  /**
417   * @param nodes Input list of znodes
418   * @return Filtered list of znodes from nodes that belong to meta replica(s).
419   */
420  private List<String> filterMetaReplicaNodes(List<String> nodes) {
421    if (nodes == null || nodes.isEmpty()) {
422      return new ArrayList<>();
423    }
424    List<String> metaReplicaNodes = new ArrayList<>(2);
425    String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX);
426    for (String child : nodes) {
427      if (child.startsWith(pattern)) {
428        metaReplicaNodes.add(child);
429      }
430    }
431    return metaReplicaNodes;
432  }
433
434  /**
435   * Register the specified listener to receive ZooKeeper events.
436   * @param listener the listener to register
437   */
438  public void registerListener(ZKListener listener) {
439    listeners.add(listener);
440  }
441
442  /**
443   * Register the specified listener to receive ZooKeeper events and add it as
444   * the first in the list of current listeners.
445   * @param listener the listener to register
446   */
447  public void registerListenerFirst(ZKListener listener) {
448    listeners.add(0, listener);
449  }
450
451  public void unregisterListener(ZKListener listener) {
452    listeners.remove(listener);
453  }
454
455  /**
456   * Clean all existing listeners
457   */
458  public void unregisterAllListeners() {
459    listeners.clear();
460  }
461
462  /**
463   * Get a copy of current registered listeners
464   */
465  public List<ZKListener> getListeners() {
466    return new ArrayList<>(listeners);
467  }
468
469  /**
470   * @return The number of currently registered listeners
471   */
472  public int getNumberOfListeners() {
473    return listeners.size();
474  }
475
476  /**
477   * Get the connection to ZooKeeper.
478   * @return connection reference to zookeeper
479   */
480  public RecoverableZooKeeper getRecoverableZooKeeper() {
481    return recoverableZooKeeper;
482  }
483
484  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
485    recoverableZooKeeper.reconnectAfterExpiration();
486  }
487
488  /**
489   * Get the quorum address of this instance.
490   * @return quorum string of this zookeeper connection instance
491   */
492  public String getQuorum() {
493    return quorum;
494  }
495
496  /**
497   * Get the znodePaths.
498   * <p>
499   * Mainly used for mocking as mockito can not mock a field access.
500   */
501  public ZNodePaths getZNodePaths() {
502    return znodePaths;
503  }
504
505  private void processEvent(WatchedEvent event) {
506    switch(event.getType()) {
507      // If event type is NONE, this is a connection status change
508      case None: {
509        connectionEvent(event);
510        break;
511      }
512
513      // Otherwise pass along to the listeners
514      case NodeCreated: {
515        for(ZKListener listener : listeners) {
516          listener.nodeCreated(event.getPath());
517        }
518        break;
519      }
520
521      case NodeDeleted: {
522        for(ZKListener listener : listeners) {
523          listener.nodeDeleted(event.getPath());
524        }
525        break;
526      }
527
528      case NodeDataChanged: {
529        for(ZKListener listener : listeners) {
530          listener.nodeDataChanged(event.getPath());
531        }
532        break;
533      }
534
535      case NodeChildrenChanged: {
536        for(ZKListener listener : listeners) {
537          listener.nodeChildrenChanged(event.getPath());
538        }
539        break;
540      }
541      default:
542        LOG.error("Invalid event of type {} received for path {}. Ignoring.",
543            event.getState(), event.getPath());
544    }
545  }
546
547  /**
548   * Method called from ZooKeeper for events and connection status.
549   * <p>
550   * Valid events are passed along to listeners.  Connection status changes
551   * are dealt with locally.
552   */
553  @Override
554  public void process(WatchedEvent event) {
555    LOG.debug(prefix("Received ZooKeeper Event, " +
556        "type=" + event.getType() + ", " +
557        "state=" + event.getState() + ", " +
558        "path=" + event.getPath()));
559    zkEventProcessor.submit(() -> processEvent(event));
560  }
561
562  // Connection management
563
564  /**
565   * Called when there is a connection-related event via the Watcher callback.
566   * <p>
567   * If Disconnected or Expired, this should shutdown the cluster. But, since
568   * we send a KeeperException.SessionExpiredException along with the abort
569   * call, it's possible for the Abortable to catch it and try to create a new
570   * session with ZooKeeper. This is what the client does in HCM.
571   * <p>
572   * @param event the connection-related event
573   */
574  private void connectionEvent(WatchedEvent event) {
575    switch(event.getState()) {
576      case SyncConnected:
577        this.identifier = this.prefix + "-0x" +
578          Long.toHexString(this.recoverableZooKeeper.getSessionId());
579        // Update our identifier.  Otherwise ignore.
580        LOG.debug("{} connected", this.identifier);
581        break;
582
583      // Abort the server if Disconnected or Expired
584      case Disconnected:
585        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
586        break;
587
588      case Closed:
589        LOG.debug(prefix("ZooKeeper client closed, ignoring"));
590        break;
591
592      case Expired:
593        String msg = prefix(this.identifier + " received expired from " +
594          "ZooKeeper, aborting");
595        // TODO: One thought is to add call to ZKListener so say,
596        // ZKNodeTracker can zero out its data values.
597        if (this.abortable != null) {
598          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
599        }
600        break;
601
602      case ConnectedReadOnly:
603      case SaslAuthenticated:
604      case AuthFailed:
605        break;
606
607      default:
608        throw new IllegalStateException("Received event is not valid: " + event.getState());
609    }
610  }
611
612  /**
613   * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a
614   * timeout lets the callers fail-fast rather than wait forever for the sync to finish.
615   * <p>
616   * Executing this method before running other methods will ensure that the
617   * subsequent operations are up-to-date and consistent as of the time that
618   * the sync is complete.
619   * <p>
620   * This is used for compareAndSwap type operations where we need to read the
621   * data of an existing node and delete or transition that node, utilizing the
622   * previously read version and data.  We want to ensure that the version read
623   * is up-to-date from when we begin the operation.
624   * <p>
625   */
626  public void syncOrTimeout(String path) throws KeeperException {
627    final CountDownLatch latch = new CountDownLatch(1);
628    long startTime = EnvironmentEdgeManager.currentTime();
629    this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null);
630    try {
631      if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) {
632        LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points "
633            + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout);
634        throw new KeeperException.RequestTimeoutException();
635      }
636    } catch (InterruptedException e) {
637      LOG.warn("Interrupted waiting for ZK sync() to finish.", e);
638      Thread.currentThread().interrupt();
639      return;
640    }
641    if (LOG.isDebugEnabled()) {
642      // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a
643      // useful metric to have since the latency of sync() impacts the callers.
644      LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime);
645    }
646  }
647
648  /**
649   * Handles KeeperExceptions in client calls.
650   * <p>
651   * This may be temporary but for now this gives one place to deal with these.
652   * <p>
653   * TODO: Currently this method rethrows the exception to let the caller handle
654   * <p>
655   * @param ke the exception to rethrow
656   * @throws KeeperException if a ZooKeeper operation fails
657   */
658  public void keeperException(KeeperException ke) throws KeeperException {
659    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
660    throw ke;
661  }
662
663  /**
664   * Handles InterruptedExceptions in client calls.
665   * @param ie the InterruptedException instance thrown
666   * @throws KeeperException the exception to throw, transformed from the InterruptedException
667   */
668  public void interruptedException(InterruptedException ie) throws KeeperException {
669    interruptedExceptionNoThrow(ie, true);
670    // Throw a system error exception to let upper level handle it
671    KeeperException keeperException = new KeeperException.SystemErrorException();
672    keeperException.initCause(ie);
673    throw keeperException;
674  }
675
676  /**
677   * Log the InterruptedException and interrupt current thread
678   * @param ie The IterruptedException to log
679   * @param throwLater Whether we will throw the exception latter
680   */
681  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
682    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
683        + (throwLater ? " and rethrow a SystemErrorException" : "")),
684      ie);
685    // At least preserve interrupt.
686    Thread.currentThread().interrupt();
687  }
688
689  /**
690   * Close the connection to ZooKeeper.
691   *
692   */
693  @Override
694  public void close() {
695    zkEventProcessor.shutdownNow();
696    try {
697      recoverableZooKeeper.close();
698    } catch (InterruptedException e) {
699      Thread.currentThread().interrupt();
700    }
701  }
702
703  public Configuration getConfiguration() {
704    return conf;
705  }
706
707  @Override
708  public void abort(String why, Throwable e) {
709    if (this.abortable != null) {
710      this.abortable.abort(why, e);
711    } else {
712      this.aborted = true;
713    }
714  }
715
716  @Override
717  public boolean isAborted() {
718    return this.abortable == null? this.aborted: this.abortable.isAborted();
719  }
720}