001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.TimeUnit;
030import java.util.regex.Matcher;
031import java.util.regex.Pattern;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.AuthUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ZooKeeperConnectionException;
037import org.apache.hadoop.hbase.security.Superusers;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.zookeeper.KeeperException;
044import org.apache.zookeeper.WatchedEvent;
045import org.apache.zookeeper.Watcher;
046import org.apache.zookeeper.ZooDefs.Ids;
047import org.apache.zookeeper.ZooDefs.Perms;
048import org.apache.zookeeper.data.ACL;
049import org.apache.zookeeper.data.Id;
050import org.apache.zookeeper.data.Stat;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
056 * for each Master, RegionServer, and client process.
057 *
058 * <p>This is the only class that implements {@link Watcher}.  Other internal
059 * classes which need to be notified of ZooKeeper events must register with
060 * the local instance of this watcher via {@link #registerListener}.
061 *
062 * <p>This class also holds and manages the connection to ZooKeeper.  Code to
063 * deal with connection related events and exceptions are handled here.
064 */
065@InterfaceAudience.Private
066public class ZKWatcher implements Watcher, Abortable, Closeable {
067  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
068
069  // Identifier for this watcher (for logging only).  It is made of the prefix
070  // passed on construction and the zookeeper sessionid.
071  private String prefix;
072  private String identifier;
073
074  // zookeeper quorum
075  private String quorum;
076
077  // zookeeper connection
078  private final RecoverableZooKeeper recoverableZooKeeper;
079
080  // abortable in case of zk failure
081  protected Abortable abortable;
082  // Used if abortable is null
083  private boolean aborted = false;
084
085  private final ZNodePaths znodePaths;
086
087  // listeners to be notified
088  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
089
090  // Single threaded executor pool that processes event notifications from Zookeeper. Events are
091  // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do
092  // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context.
093  // EventThread internally runs a single while loop to serially process all the events. When events
094  // are processed by the listeners in the same thread, that blocks the EventThread from processing
095  // subsequent events. Processing events in a separate thread frees up the event thread to continue
096  // and further prevents deadlocks if the process method itself makes other zookeeper calls.
097  // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
098  // requests using a single while loop and hence there is no performance degradation.
099  private final ExecutorService zkEventProcessor = Executors.newSingleThreadExecutor(
100    new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d")
101      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
102
103  private final Configuration conf;
104
105  private final long zkSyncTimeout;
106
107  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
108  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
109
110  /**
111   * Instantiate a ZooKeeper connection and watcher.
112   * @param identifier string that is passed to RecoverableZookeeper to be used as
113   *                   identifier for this instance. Use null for default.
114   * @throws IOException if the connection to ZooKeeper fails
115   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
116   */
117  public ZKWatcher(Configuration conf, String identifier,
118                   Abortable abortable) throws ZooKeeperConnectionException, IOException {
119    this(conf, identifier, abortable, false);
120  }
121
122  /**
123   * Instantiate a ZooKeeper connection and watcher.
124   * @param conf the configuration to use
125   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
126   *          this instance. Use null for default.
127   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
128   *          context.
129   * @param canCreateBaseZNode true if a base ZNode can be created
130   * @throws IOException if the connection to ZooKeeper fails
131   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
132   */
133  public ZKWatcher(Configuration conf, String identifier,
134                   Abortable abortable, boolean canCreateBaseZNode)
135    throws IOException, ZooKeeperConnectionException {
136    this(conf, identifier, abortable, canCreateBaseZNode, false);
137  }
138
139  /**
140   * Instantiate a ZooKeeper connection and watcher.
141   * @param conf the configuration to use
142   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
143   *          this instance. Use null for default.
144   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
145   *          context.
146   * @param canCreateBaseZNode true if a base ZNode can be created
147   * @param clientZK whether this watcher is set to access client ZK
148   * @throws IOException if the connection to ZooKeeper fails
149   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
150   *           ZNodes
151   */
152  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
153      boolean canCreateBaseZNode, boolean clientZK)
154      throws IOException, ZooKeeperConnectionException {
155    this.conf = conf;
156    if (clientZK) {
157      String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
158      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
159      if (clientZkQuorumServers != null) {
160        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
161          // Don't allow same settings to avoid dead loop when master trying
162          // to sync meta information from server ZK to client ZK
163          throw new IllegalArgumentException(
164              "The quorum settings for client ZK should be different from those for server");
165        }
166        this.quorum = clientZkQuorumServers;
167      } else {
168        this.quorum = serverZkQuorumServers;
169      }
170    } else {
171      this.quorum = ZKConfig.getZKQuorumServersString(conf);
172    }
173    this.prefix = identifier;
174    // Identifier will get the sessionid appended later below down when we
175    // handle the syncconnect event.
176    this.identifier = identifier + "0x0";
177    this.abortable = abortable;
178    this.znodePaths = new ZNodePaths(conf);
179    PendingWatcher pendingWatcher = new PendingWatcher();
180    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
181    pendingWatcher.prepare(this);
182    if (canCreateBaseZNode) {
183      try {
184        createBaseZNodes();
185      } catch (ZooKeeperConnectionException zce) {
186        try {
187          this.recoverableZooKeeper.close();
188        } catch (InterruptedException ie) {
189          LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper);
190          Thread.currentThread().interrupt();
191        }
192        throw zce;
193      }
194    }
195    this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS,
196        HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
197  }
198
199  private void createBaseZNodes() throws ZooKeeperConnectionException {
200    try {
201      // Create all the necessary "directories" of znodes
202      ZKUtil.createWithParents(this, znodePaths.baseZNode);
203      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
204      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
205      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
206      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
207      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
208      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
209    } catch (KeeperException e) {
210      throw new ZooKeeperConnectionException(
211          prefix("Unexpected KeeperException creating base node"), e);
212    }
213  }
214
215  /**
216   * On master start, we check the znode ACLs under the root directory and set the ACLs properly
217   * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
218   * so that the existing znodes created with open permissions are now changed with restrictive
219   * perms.
220   */
221  public void checkAndSetZNodeAcls() {
222    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
223      LOG.info("not a secure deployment, proceeding");
224      return;
225    }
226
227    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
228    // correct.
229    try {
230      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
231
232      if (!isBaseZnodeAclSetup(actualAcls)) {
233        LOG.info("setting znode ACLs");
234        setZnodeAclsRecursive(znodePaths.baseZNode);
235      }
236    } catch(KeeperException.NoNodeException nne) {
237      return;
238    } catch(InterruptedException ie) {
239      interruptedExceptionNoThrow(ie, false);
240    } catch (IOException|KeeperException e) {
241      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
242    }
243  }
244
245  /**
246   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
247   * will be set last in case the master fails in between.
248   * @param znode the ZNode to set the permissions for
249   */
250  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
251    List<String> children = recoverableZooKeeper.getChildren(znode, false);
252
253    for (String child : children) {
254      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
255    }
256    List<ACL> acls = ZKUtil.createACL(this, znode, true);
257    LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
258    recoverableZooKeeper.setAcl(znode, acls, -1);
259  }
260
261  /**
262   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
263   * @param acls acls from zookeeper
264   * @return whether ACLs are set for the base znode
265   * @throws IOException if getting the current user fails
266   */
267  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
268    if (LOG.isDebugEnabled()) {
269      LOG.debug("Checking znode ACLs");
270    }
271    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
272    // Check whether ACL set for all superusers
273    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
274      return false;
275    }
276
277    // this assumes that current authenticated user is the same as zookeeper client user
278    // configured via JAAS
279    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
280
281    if (acls.isEmpty()) {
282      if (LOG.isDebugEnabled()) {
283        LOG.debug("ACL is empty");
284      }
285      return false;
286    }
287
288    for (ACL acl : acls) {
289      int perms = acl.getPerms();
290      Id id = acl.getId();
291      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
292      // and one for the hbase user
293      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
294        if (perms != Perms.READ) {
295          if (LOG.isDebugEnabled()) {
296            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
297              id, perms, Perms.READ));
298          }
299          return false;
300        }
301      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
302        if (perms != Perms.ALL) {
303          if (LOG.isDebugEnabled()) {
304            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
305              id, perms, Perms.ALL));
306          }
307          return false;
308        }
309      } else if ("sasl".equals(id.getScheme())) {
310        String name = id.getId();
311        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
312        Matcher match = NAME_PATTERN.matcher(name);
313        if (match.matches()) {
314          name = match.group(1);
315        }
316        if (name.equals(hbaseUser)) {
317          if (perms != Perms.ALL) {
318            if (LOG.isDebugEnabled()) {
319              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
320                id, perms, Perms.ALL));
321            }
322            return false;
323          }
324        } else {
325          if (LOG.isDebugEnabled()) {
326            LOG.debug("Unexpected shortname in SASL ACL: {}", id);
327          }
328          return false;
329        }
330      } else {
331        if (LOG.isDebugEnabled()) {
332          LOG.debug("unexpected ACL id '{}'", id);
333        }
334        return false;
335      }
336    }
337    return true;
338  }
339
340  /*
341   * Validate whether ACL set for all superusers.
342   */
343  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
344    for (String user : superUsers) {
345      boolean hasAccess = false;
346      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
347      if (!AuthUtil.isGroupPrincipal(user)) {
348        for (ACL acl : acls) {
349          if (user.equals(acl.getId().getId())) {
350            if (acl.getPerms() == Perms.ALL) {
351              hasAccess = true;
352            } else {
353              if (LOG.isDebugEnabled()) {
354                LOG.debug(String.format(
355                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
356                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
357              }
358            }
359            break;
360          }
361        }
362        if (!hasAccess) {
363          return false;
364        }
365      }
366    }
367    return true;
368  }
369
370  /*
371   * Validate whether ACL ID is superuser.
372   */
373  public static boolean isSuperUserId(String[] superUsers, Id id) {
374    for (String user : superUsers) {
375      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
376      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
377        return true;
378      }
379    }
380    return false;
381  }
382
383  @Override
384  public String toString() {
385    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
386  }
387
388  /**
389   * Adds this instance's identifier as a prefix to the passed <code>str</code>
390   * @param str String to amend.
391   * @return A new string with this instance's identifier as prefix: e.g.
392   *         if passed 'hello world', the returned string could be
393   */
394  public String prefix(final String str) {
395    return this.toString() + " " + str;
396  }
397
398  /**
399   * Get the znodes corresponding to the meta replicas from ZK
400   * @return list of znodes
401   * @throws KeeperException if a ZooKeeper operation fails
402   */
403  public List<String> getMetaReplicaNodes() throws KeeperException {
404    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
405    return filterMetaReplicaNodes(childrenOfBaseNode);
406  }
407
408  /**
409   * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode
410   * for subsequent CREATE/DELETE operations on child nodes.
411   */
412  public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException {
413    List<String> childrenOfBaseNode =
414        ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode);
415    return filterMetaReplicaNodes(childrenOfBaseNode);
416  }
417
418  /**
419   * @param nodes Input list of znodes
420   * @return Filtered list of znodes from nodes that belong to meta replica(s).
421   */
422  private List<String> filterMetaReplicaNodes(List<String> nodes) {
423    if (nodes == null || nodes.isEmpty()) {
424      return new ArrayList<>();
425    }
426    List<String> metaReplicaNodes = new ArrayList<>(2);
427    String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX);
428    for (String child : nodes) {
429      if (child.startsWith(pattern)) {
430        metaReplicaNodes.add(child);
431      }
432    }
433    return metaReplicaNodes;
434  }
435
436  /**
437   * Register the specified listener to receive ZooKeeper events.
438   * @param listener the listener to register
439   */
440  public void registerListener(ZKListener listener) {
441    listeners.add(listener);
442  }
443
444  /**
445   * Register the specified listener to receive ZooKeeper events and add it as
446   * the first in the list of current listeners.
447   * @param listener the listener to register
448   */
449  public void registerListenerFirst(ZKListener listener) {
450    listeners.add(0, listener);
451  }
452
453  public void unregisterListener(ZKListener listener) {
454    listeners.remove(listener);
455  }
456
457  /**
458   * Clean all existing listeners
459   */
460  public void unregisterAllListeners() {
461    listeners.clear();
462  }
463
464  /**
465   * Get a copy of current registered listeners
466   */
467  public List<ZKListener> getListeners() {
468    return new ArrayList<>(listeners);
469  }
470
471  /**
472   * @return The number of currently registered listeners
473   */
474  public int getNumberOfListeners() {
475    return listeners.size();
476  }
477
478  /**
479   * Get the connection to ZooKeeper.
480   * @return connection reference to zookeeper
481   */
482  public RecoverableZooKeeper getRecoverableZooKeeper() {
483    return recoverableZooKeeper;
484  }
485
486  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
487    recoverableZooKeeper.reconnectAfterExpiration();
488  }
489
490  /**
491   * Get the quorum address of this instance.
492   * @return quorum string of this zookeeper connection instance
493   */
494  public String getQuorum() {
495    return quorum;
496  }
497
498  /**
499   * Get the znodePaths.
500   * <p>
501   * Mainly used for mocking as mockito can not mock a field access.
502   */
503  public ZNodePaths getZNodePaths() {
504    return znodePaths;
505  }
506
507  private void processEvent(WatchedEvent event) {
508    switch(event.getType()) {
509      // If event type is NONE, this is a connection status change
510      case None: {
511        connectionEvent(event);
512        break;
513      }
514
515      // Otherwise pass along to the listeners
516      case NodeCreated: {
517        for(ZKListener listener : listeners) {
518          listener.nodeCreated(event.getPath());
519        }
520        break;
521      }
522
523      case NodeDeleted: {
524        for(ZKListener listener : listeners) {
525          listener.nodeDeleted(event.getPath());
526        }
527        break;
528      }
529
530      case NodeDataChanged: {
531        for(ZKListener listener : listeners) {
532          listener.nodeDataChanged(event.getPath());
533        }
534        break;
535      }
536
537      case NodeChildrenChanged: {
538        for(ZKListener listener : listeners) {
539          listener.nodeChildrenChanged(event.getPath());
540        }
541        break;
542      }
543      default:
544        LOG.error("Invalid event of type {} received for path {}. Ignoring.",
545            event.getState(), event.getPath());
546    }
547  }
548
549  /**
550   * Method called from ZooKeeper for events and connection status.
551   * <p>
552   * Valid events are passed along to listeners.  Connection status changes
553   * are dealt with locally.
554   */
555  @Override
556  public void process(WatchedEvent event) {
557    LOG.debug(prefix("Received ZooKeeper Event, " +
558        "type=" + event.getType() + ", " +
559        "state=" + event.getState() + ", " +
560        "path=" + event.getPath()));
561    zkEventProcessor.submit(() -> processEvent(event));
562  }
563
564  // Connection management
565
566  /**
567   * Called when there is a connection-related event via the Watcher callback.
568   * <p>
569   * If Disconnected or Expired, this should shutdown the cluster. But, since
570   * we send a KeeperException.SessionExpiredException along with the abort
571   * call, it's possible for the Abortable to catch it and try to create a new
572   * session with ZooKeeper. This is what the client does in HCM.
573   * <p>
574   * @param event the connection-related event
575   */
576  private void connectionEvent(WatchedEvent event) {
577    switch(event.getState()) {
578      case SyncConnected:
579        this.identifier = this.prefix + "-0x" +
580          Long.toHexString(this.recoverableZooKeeper.getSessionId());
581        // Update our identifier.  Otherwise ignore.
582        LOG.debug("{} connected", this.identifier);
583        break;
584
585      // Abort the server if Disconnected or Expired
586      case Disconnected:
587        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
588        break;
589
590      case Closed:
591        LOG.debug(prefix("ZooKeeper client closed, ignoring"));
592        break;
593
594      case Expired:
595        String msg = prefix(this.identifier + " received expired from " +
596          "ZooKeeper, aborting");
597        // TODO: One thought is to add call to ZKListener so say,
598        // ZKNodeTracker can zero out its data values.
599        if (this.abortable != null) {
600          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
601        }
602        break;
603
604      case ConnectedReadOnly:
605      case SaslAuthenticated:
606      case AuthFailed:
607        break;
608
609      default:
610        throw new IllegalStateException("Received event is not valid: " + event.getState());
611    }
612  }
613
614  /**
615   * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a
616   * timeout lets the callers fail-fast rather than wait forever for the sync to finish.
617   * <p>
618   * Executing this method before running other methods will ensure that the
619   * subsequent operations are up-to-date and consistent as of the time that
620   * the sync is complete.
621   * <p>
622   * This is used for compareAndSwap type operations where we need to read the
623   * data of an existing node and delete or transition that node, utilizing the
624   * previously read version and data.  We want to ensure that the version read
625   * is up-to-date from when we begin the operation.
626   * <p>
627   */
628  public void syncOrTimeout(String path) throws KeeperException {
629    final CountDownLatch latch = new CountDownLatch(1);
630    long startTime = EnvironmentEdgeManager.currentTime();
631    this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null);
632    try {
633      if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) {
634        LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points "
635            + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout);
636        throw new KeeperException.RequestTimeoutException();
637      }
638    } catch (InterruptedException e) {
639      LOG.warn("Interrupted waiting for ZK sync() to finish.", e);
640      Thread.currentThread().interrupt();
641      return;
642    }
643    if (LOG.isDebugEnabled()) {
644      // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a
645      // useful metric to have since the latency of sync() impacts the callers.
646      LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime);
647    }
648  }
649
650  /**
651   * Handles KeeperExceptions in client calls.
652   * <p>
653   * This may be temporary but for now this gives one place to deal with these.
654   * <p>
655   * TODO: Currently this method rethrows the exception to let the caller handle
656   * <p>
657   * @param ke the exception to rethrow
658   * @throws KeeperException if a ZooKeeper operation fails
659   */
660  public void keeperException(KeeperException ke) throws KeeperException {
661    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
662    throw ke;
663  }
664
665  /**
666   * Handles InterruptedExceptions in client calls.
667   * @param ie the InterruptedException instance thrown
668   * @throws KeeperException the exception to throw, transformed from the InterruptedException
669   */
670  public void interruptedException(InterruptedException ie) throws KeeperException {
671    interruptedExceptionNoThrow(ie, true);
672    // Throw a system error exception to let upper level handle it
673    KeeperException keeperException = new KeeperException.SystemErrorException();
674    keeperException.initCause(ie);
675    throw keeperException;
676  }
677
678  /**
679   * Log the InterruptedException and interrupt current thread
680   * @param ie The IterruptedException to log
681   * @param throwLater Whether we will throw the exception latter
682   */
683  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
684    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
685        + (throwLater ? " and rethrow a SystemErrorException" : "")),
686      ie);
687    // At least preserve interrupt.
688    Thread.currentThread().interrupt();
689  }
690
691  /**
692   * Close the connection to ZooKeeper.
693   *
694   */
695  @Override
696  public void close() {
697    try {
698      recoverableZooKeeper.close();
699    } catch (InterruptedException e) {
700      Thread.currentThread().interrupt();
701    } finally {
702      zkEventProcessor.shutdownNow();
703    }
704  }
705
706  public Configuration getConfiguration() {
707    return conf;
708  }
709
710  @Override
711  public void abort(String why, Throwable e) {
712    if (this.abortable != null) {
713      this.abortable.abort(why, e);
714    } else {
715      this.aborted = true;
716    }
717  }
718
719  @Override
720  public boolean isAborted() {
721    return this.abortable == null? this.aborted: this.abortable.isAborted();
722  }
723}