001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.AuthUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ZooKeeperConnectionException;
036import org.apache.hadoop.hbase.security.Superusers;
037import org.apache.hadoop.hbase.trace.TraceUtil;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.zookeeper.KeeperException;
043import org.apache.zookeeper.WatchedEvent;
044import org.apache.zookeeper.Watcher;
045import org.apache.zookeeper.ZooDefs.Ids;
046import org.apache.zookeeper.ZooDefs.Perms;
047import org.apache.zookeeper.data.ACL;
048import org.apache.zookeeper.data.Id;
049import org.apache.zookeeper.data.Stat;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055/**
056 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated for each Master,
057 * RegionServer, and client process.
058 * <p>
059 * This is the only class that implements {@link Watcher}. Other internal classes which need to be
060 * notified of ZooKeeper events must register with the local instance of this watcher via
061 * {@link #registerListener}.
062 * <p>
063 * This class also holds and manages the connection to ZooKeeper. Code to deal with connection
064 * related events and exceptions are handled here.
065 */
066@InterfaceAudience.Private
067public class ZKWatcher implements Watcher, Abortable, Closeable {
068  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
069
070  // Identifier for this watcher (for logging only). It is made of the prefix
071  // passed on construction and the zookeeper sessionid.
072  private final String prefix;
073  private String identifier;
074
075  // zookeeper quorum
076  private final String quorum;
077
078  // zookeeper connection
079  private final RecoverableZooKeeper recoverableZooKeeper;
080
081  // abortable in case of zk failure
082  protected Abortable abortable;
083  // Used if abortable is null
084  private boolean aborted = false;
085
086  private final ZNodePaths znodePaths;
087
088  // listeners to be notified
089  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
090
091  // Single threaded executor pool that processes event notifications from Zookeeper. Events are
092  // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do
093  // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context.
094  // EventThread internally runs a single while loop to serially process all the events. When events
095  // are processed by the listeners in the same thread, that blocks the EventThread from processing
096  // subsequent events. Processing events in a separate thread frees up the event thread to continue
097  // and further prevents deadlocks if the process method itself makes other zookeeper calls.
098  // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
099  // requests using a single while loop and hence there is no performance degradation.
100  private final ExecutorService zkEventProcessor = Executors
101    .newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d")
102      .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
103
104  private final Configuration conf;
105
106  private final long zkSyncTimeout;
107
108  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
109  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
110
111  /**
112   * Instantiate a ZooKeeper connection and watcher.
113   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
114   *                   this instance. Use null for default.
115   * @throws IOException                  if the connection to ZooKeeper fails
116   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
117   */
118  public ZKWatcher(Configuration conf, String identifier, Abortable abortable)
119    throws ZooKeeperConnectionException, IOException {
120    this(conf, identifier, abortable, false);
121  }
122
123  /**
124   * Instantiate a ZooKeeper connection and watcher.
125   * @param conf               the configuration to use
126   * @param identifier         string that is passed to RecoverableZookeeper to be used as
127   *                           identifier for this instance. Use null for default.
128   * @param abortable          Can be null if there is on error there is no host to abort: e.g.
129   *                           client context.
130   * @param canCreateBaseZNode true if a base ZNode can be created
131   * @throws IOException                  if the connection to ZooKeeper fails
132   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
133   */
134  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
135    boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException {
136    this(conf, identifier, abortable, canCreateBaseZNode, false);
137  }
138
139  /**
140   * Instantiate a ZooKeeper connection and watcher.
141   * @param conf               the configuration to use
142   * @param identifier         string that is passed to RecoverableZookeeper to be used as
143   *                           identifier for this instance. Use null for default.
144   * @param abortable          Can be null if there is on error there is no host to abort: e.g.
145   *                           client context.
146   * @param canCreateBaseZNode true if a base ZNode can be created
147   * @param clientZK           whether this watcher is set to access client ZK
148   * @throws IOException                  if the connection to ZooKeeper fails
149   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
150   *                                      ZNodes
151   */
152  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
153    boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException {
154    this.conf = conf;
155    if (clientZK) {
156      String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
157      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
158      if (clientZkQuorumServers != null) {
159        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
160          // Don't allow same settings to avoid dead loop when master trying
161          // to sync meta information from server ZK to client ZK
162          throw new IllegalArgumentException(
163            "The quorum settings for client ZK should be different from those for server");
164        }
165        this.quorum = clientZkQuorumServers;
166      } else {
167        this.quorum = serverZkQuorumServers;
168      }
169    } else {
170      this.quorum = ZKConfig.getZKQuorumServersString(conf);
171    }
172    this.prefix = identifier;
173    // Identifier will get the sessionid appended later below down when we
174    // handle the syncconnect event.
175    this.identifier = identifier + "0x0";
176    this.abortable = abortable;
177    this.znodePaths = new ZNodePaths(conf);
178    PendingWatcher pendingWatcher = new PendingWatcher();
179    this.recoverableZooKeeper = RecoverableZooKeeper.connect(conf, quorum, pendingWatcher,
180      identifier, ZKConfig.getZKClientConfig(conf));
181    pendingWatcher.prepare(this);
182    if (canCreateBaseZNode) {
183      try {
184        createBaseZNodes();
185      } catch (ZooKeeperConnectionException zce) {
186        try {
187          this.recoverableZooKeeper.close();
188        } catch (InterruptedException ie) {
189          LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper);
190          Thread.currentThread().interrupt();
191        }
192        throw zce;
193      }
194    }
195    this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS,
196      HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
197  }
198
199  public List<ACL> createACL(String node) {
200    return createACL(node, ZKAuthentication.isSecureZooKeeper(getConfiguration()));
201  }
202
203  public List<ACL> createACL(String node, boolean isSecureZooKeeper) {
204    if (!node.startsWith(getZNodePaths().baseZNode)) {
205      return Ids.OPEN_ACL_UNSAFE;
206    }
207    if (isSecureZooKeeper) {
208      ArrayList<ACL> acls = new ArrayList<>();
209      // add permission to hbase supper user
210      String[] superUsers = getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
211      String hbaseUser = null;
212      try {
213        hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
214      } catch (IOException e) {
215        LOG.debug("Could not acquire current User.", e);
216      }
217      if (superUsers != null) {
218        List<String> groups = new ArrayList<>();
219        for (String user : superUsers) {
220          if (AuthUtil.isGroupPrincipal(user)) {
221            // TODO: Set node ACL for groups when ZK supports this feature
222            groups.add(user);
223          } else {
224            if (!user.equals(hbaseUser)) {
225              acls.add(new ACL(Perms.ALL, new Id("sasl", user)));
226            }
227          }
228        }
229        if (!groups.isEmpty()) {
230          LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this "
231            + "feature presently.", groups);
232        }
233      }
234      // Certain znodes are accessed directly by the client,
235      // so they must be readable by non-authenticated clients
236      if (getZNodePaths().isClientReadable(node)) {
237        acls.addAll(Ids.CREATOR_ALL_ACL);
238        acls.addAll(Ids.READ_ACL_UNSAFE);
239      } else {
240        acls.addAll(Ids.CREATOR_ALL_ACL);
241      }
242      return acls;
243    } else {
244      return Ids.OPEN_ACL_UNSAFE;
245    }
246  }
247
248  private void createBaseZNodes() throws ZooKeeperConnectionException {
249    try {
250      // Create all the necessary "directories" of znodes
251      ZKUtil.createWithParents(this, znodePaths.baseZNode);
252      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
253      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
254      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
255      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
256      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
257      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
258    } catch (KeeperException e) {
259      throw new ZooKeeperConnectionException(
260        prefix("Unexpected KeeperException creating base node"), e);
261    }
262  }
263
264  /**
265   * On master start, we check the znode ACLs under the root directory and set the ACLs properly if
266   * needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed so
267   * that the existing znodes created with open permissions are now changed with restrictive perms.
268   */
269  public void checkAndSetZNodeAcls() {
270    if (!ZKAuthentication.isSecureZooKeeper(getConfiguration())) {
271      LOG.info("not a secure deployment, proceeding");
272      return;
273    }
274
275    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
276    // correct.
277    try {
278      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
279
280      if (!isBaseZnodeAclSetup(actualAcls)) {
281        LOG.info("setting znode ACLs");
282        setZnodeAclsRecursive(znodePaths.baseZNode);
283      }
284    } catch (KeeperException.NoNodeException nne) {
285      return;
286    } catch (InterruptedException ie) {
287      interruptedExceptionNoThrow(ie, false);
288    } catch (IOException | KeeperException e) {
289      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
290    }
291  }
292
293  /**
294   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs will
295   * be set last in case the master fails in between.
296   * @param znode the ZNode to set the permissions for
297   */
298  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
299    List<String> children = recoverableZooKeeper.getChildren(znode, false);
300
301    for (String child : children) {
302      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
303    }
304    List<ACL> acls = createACL(znode, true);
305    LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
306    recoverableZooKeeper.setAcl(znode, acls, -1);
307  }
308
309  /**
310   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
311   * @param acls acls from zookeeper
312   * @return whether ACLs are set for the base znode
313   * @throws IOException if getting the current user fails
314   */
315  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
316    if (LOG.isDebugEnabled()) {
317      LOG.debug("Checking znode ACLs");
318    }
319    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
320    // Check whether ACL set for all superusers
321    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
322      return false;
323    }
324
325    // this assumes that current authenticated user is the same as zookeeper client user
326    // configured via JAAS
327    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
328
329    if (acls.isEmpty()) {
330      if (LOG.isDebugEnabled()) {
331        LOG.debug("ACL is empty");
332      }
333      return false;
334    }
335
336    for (ACL acl : acls) {
337      int perms = acl.getPerms();
338      Id id = acl.getId();
339      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
340      // and one for the hbase user
341      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
342        if (perms != Perms.READ) {
343          if (LOG.isDebugEnabled()) {
344            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
345              id, perms, Perms.READ));
346          }
347          return false;
348        }
349      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
350        if (perms != Perms.ALL) {
351          if (LOG.isDebugEnabled()) {
352            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
353              id, perms, Perms.ALL));
354          }
355          return false;
356        }
357      } else if ("sasl".equals(id.getScheme())) {
358        String name = id.getId();
359        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
360        Matcher match = NAME_PATTERN.matcher(name);
361        if (match.matches()) {
362          name = match.group(1);
363        }
364        if (name.equals(hbaseUser)) {
365          if (perms != Perms.ALL) {
366            if (LOG.isDebugEnabled()) {
367              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
368                id, perms, Perms.ALL));
369            }
370            return false;
371          }
372        } else {
373          if (LOG.isDebugEnabled()) {
374            LOG.debug("Unexpected shortname in SASL ACL: {}", id);
375          }
376          return false;
377        }
378      } else {
379        if (LOG.isDebugEnabled()) {
380          LOG.debug("unexpected ACL id '{}'", id);
381        }
382        return false;
383      }
384    }
385    return true;
386  }
387
388  /*
389   * Validate whether ACL set for all superusers.
390   */
391  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
392    for (String user : superUsers) {
393      boolean hasAccess = false;
394      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
395      if (!AuthUtil.isGroupPrincipal(user)) {
396        for (ACL acl : acls) {
397          if (user.equals(acl.getId().getId())) {
398            if (acl.getPerms() == Perms.ALL) {
399              hasAccess = true;
400            } else {
401              if (LOG.isDebugEnabled()) {
402                LOG.debug(String.format(
403                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
404                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
405              }
406            }
407            break;
408          }
409        }
410        if (!hasAccess) {
411          return false;
412        }
413      }
414    }
415    return true;
416  }
417
418  /*
419   * Validate whether ACL ID is superuser.
420   */
421  public static boolean isSuperUserId(String[] superUsers, Id id) {
422    for (String user : superUsers) {
423      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
424      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
425        return true;
426      }
427    }
428    return false;
429  }
430
431  @Override
432  public String toString() {
433    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
434  }
435
436  /**
437   * Adds this instance's identifier as a prefix to the passed <code>str</code>
438   * @param str String to amend.
439   * @return A new string with this instance's identifier as prefix: e.g. if passed 'hello world',
440   *         the returned string could be
441   */
442  public String prefix(final String str) {
443    return this.toString() + " " + str;
444  }
445
446  /**
447   * Get the znodes corresponding to the meta replicas from ZK
448   * @return list of znodes
449   * @throws KeeperException if a ZooKeeper operation fails
450   */
451  public List<String> getMetaReplicaNodes() throws KeeperException {
452    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
453    return filterMetaReplicaNodes(childrenOfBaseNode);
454  }
455
456  /**
457   * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode
458   * for subsequent CREATE/DELETE operations on child nodes.
459   */
460  public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException {
461    List<String> childrenOfBaseNode =
462      ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode);
463    return filterMetaReplicaNodes(childrenOfBaseNode);
464  }
465
466  /**
467   * @param nodes Input list of znodes
468   * @return Filtered list of znodes from nodes that belong to meta replica(s).
469   */
470  private List<String> filterMetaReplicaNodes(List<String> nodes) {
471    if (nodes == null || nodes.isEmpty()) {
472      return new ArrayList<>();
473    }
474    List<String> metaReplicaNodes = new ArrayList<>(2);
475    String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX);
476    for (String child : nodes) {
477      if (child.startsWith(pattern)) {
478        metaReplicaNodes.add(child);
479      }
480    }
481    return metaReplicaNodes;
482  }
483
484  /**
485   * Register the specified listener to receive ZooKeeper events.
486   * @param listener the listener to register
487   */
488  public void registerListener(ZKListener listener) {
489    listeners.add(listener);
490  }
491
492  /**
493   * Register the specified listener to receive ZooKeeper events and add it as the first in the list
494   * of current listeners.
495   * @param listener the listener to register
496   */
497  public void registerListenerFirst(ZKListener listener) {
498    listeners.add(0, listener);
499  }
500
501  public void unregisterListener(ZKListener listener) {
502    listeners.remove(listener);
503  }
504
505  /**
506   * Clean all existing listeners
507   */
508  public void unregisterAllListeners() {
509    listeners.clear();
510  }
511
512  /**
513   * Get a copy of current registered listeners
514   */
515  public List<ZKListener> getListeners() {
516    return new ArrayList<>(listeners);
517  }
518
519  /** Returns The number of currently registered listeners */
520  public int getNumberOfListeners() {
521    return listeners.size();
522  }
523
524  /**
525   * Get the connection to ZooKeeper.
526   * @return connection reference to zookeeper
527   */
528  public RecoverableZooKeeper getRecoverableZooKeeper() {
529    return recoverableZooKeeper;
530  }
531
532  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
533    recoverableZooKeeper.reconnectAfterExpiration();
534  }
535
536  /**
537   * Get the quorum address of this instance.
538   * @return quorum string of this zookeeper connection instance
539   */
540  public String getQuorum() {
541    return quorum;
542  }
543
544  /**
545   * Get the znodePaths.
546   * <p>
547   * Mainly used for mocking as mockito can not mock a field access.
548   */
549  public ZNodePaths getZNodePaths() {
550    return znodePaths;
551  }
552
553  private void processEvent(WatchedEvent event) {
554    TraceUtil.trace(() -> {
555      switch (event.getType()) {
556        // If event type is NONE, this is a connection status change
557        case None: {
558          connectionEvent(event);
559          break;
560        }
561
562        // Otherwise pass along to the listeners
563        case NodeCreated: {
564          for (ZKListener listener : listeners) {
565            listener.nodeCreated(event.getPath());
566          }
567          break;
568        }
569
570        case NodeDeleted: {
571          for (ZKListener listener : listeners) {
572            listener.nodeDeleted(event.getPath());
573          }
574          break;
575        }
576
577        case NodeDataChanged: {
578          for (ZKListener listener : listeners) {
579            listener.nodeDataChanged(event.getPath());
580          }
581          break;
582        }
583
584        case NodeChildrenChanged: {
585          for (ZKListener listener : listeners) {
586            listener.nodeChildrenChanged(event.getPath());
587          }
588          break;
589        }
590        default:
591          LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(),
592            event.getPath());
593      }
594    }, "ZKWatcher.processEvent: " + event.getType() + " " + event.getPath());
595  }
596
597  /**
598   * Method called from ZooKeeper for events and connection status.
599   * <p>
600   * Valid events are passed along to listeners. Connection status changes are dealt with locally.
601   */
602  @Override
603  public void process(WatchedEvent event) {
604    LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state="
605      + event.getState() + ", " + "path=" + event.getPath()));
606    final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier;
607    if (!zkEventProcessor.isShutdown()) {
608      zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> processEvent(event), spanName));
609    }
610  }
611
612  // Connection management
613
614  /**
615   * Called when there is a connection-related event via the Watcher callback.
616   * <p>
617   * If Disconnected or Expired, this should shutdown the cluster. But, since we send a
618   * KeeperException.SessionExpiredException along with the abort call, it's possible for the
619   * Abortable to catch it and try to create a new session with ZooKeeper. This is what the client
620   * does in HCM.
621   * <p>
622   * @param event the connection-related event
623   */
624  private void connectionEvent(WatchedEvent event) {
625    switch (event.getState()) {
626      case SyncConnected:
627        this.identifier =
628          this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId());
629        // Update our identifier. Otherwise ignore.
630        LOG.debug("{} connected", this.identifier);
631        break;
632
633      // Abort the server if Disconnected or Expired
634      case Disconnected:
635        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
636        break;
637
638      case Closed:
639        LOG.debug(prefix("ZooKeeper client closed, ignoring"));
640        break;
641
642      case Expired:
643        String msg = prefix(this.identifier + " received expired from " + "ZooKeeper, aborting");
644        // TODO: One thought is to add call to ZKListener so say,
645        // ZKNodeTracker can zero out its data values.
646        if (this.abortable != null) {
647          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
648        }
649        break;
650
651      case ConnectedReadOnly:
652      case SaslAuthenticated:
653      case AuthFailed:
654        break;
655
656      default:
657        throw new IllegalStateException("Received event is not valid: " + event.getState());
658    }
659  }
660
661  /**
662   * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a
663   * timeout lets the callers fail-fast rather than wait forever for the sync to finish.
664   * <p>
665   * Executing this method before running other methods will ensure that the subsequent operations
666   * are up-to-date and consistent as of the time that the sync is complete.
667   * <p>
668   * This is used for compareAndSwap type operations where we need to read the data of an existing
669   * node and delete or transition that node, utilizing the previously read version and data. We
670   * want to ensure that the version read is up-to-date from when we begin the operation.
671   * <p>
672   */
673  public void syncOrTimeout(String path) throws KeeperException {
674    final CountDownLatch latch = new CountDownLatch(1);
675    long startTime = EnvironmentEdgeManager.currentTime();
676    this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null);
677    try {
678      if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) {
679        LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points "
680          + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout);
681        throw new KeeperException.RequestTimeoutException();
682      }
683    } catch (InterruptedException e) {
684      LOG.warn("Interrupted waiting for ZK sync() to finish.", e);
685      Thread.currentThread().interrupt();
686      return;
687    }
688    if (LOG.isDebugEnabled()) {
689      // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a
690      // useful metric to have since the latency of sync() impacts the callers.
691      LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime);
692    }
693  }
694
695  /**
696   * Handles KeeperExceptions in client calls.
697   * <p>
698   * This may be temporary but for now this gives one place to deal with these.
699   * <p>
700   * TODO: Currently this method rethrows the exception to let the caller handle
701   * <p>
702   * @param ke the exception to rethrow
703   * @throws KeeperException if a ZooKeeper operation fails
704   */
705  public void keeperException(KeeperException ke) throws KeeperException {
706    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
707    throw ke;
708  }
709
710  /**
711   * Handles InterruptedExceptions in client calls.
712   * @param ie the InterruptedException instance thrown
713   * @throws KeeperException the exception to throw, transformed from the InterruptedException
714   */
715  public void interruptedException(InterruptedException ie) throws KeeperException {
716    interruptedExceptionNoThrow(ie, true);
717    // Throw a system error exception to let upper level handle it
718    KeeperException keeperException = new KeeperException.SystemErrorException();
719    keeperException.initCause(ie);
720    throw keeperException;
721  }
722
723  /**
724   * Log the InterruptedException and interrupt current thread
725   * @param ie         The IterruptedException to log
726   * @param throwLater Whether we will throw the exception latter
727   */
728  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
729    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
730      + (throwLater ? " and rethrow a SystemErrorException" : "")), ie);
731    // At least preserve interrupt.
732    Thread.currentThread().interrupt();
733  }
734
735  /**
736   * Close the connection to ZooKeeper.
737   */
738  @Override
739  public void close() {
740    zkEventProcessor.shutdown();
741    try {
742      if (!zkEventProcessor.awaitTermination(15, TimeUnit.SECONDS)) {
743        LOG.warn("ZKWatcher event processor has not finished to terminate.");
744        zkEventProcessor.shutdownNow();
745      }
746    } catch (InterruptedException e) {
747      Thread.currentThread().interrupt();
748    } finally {
749      try {
750        recoverableZooKeeper.close();
751      } catch (InterruptedException e) {
752        Thread.currentThread().interrupt();
753      }
754    }
755  }
756
757  public Configuration getConfiguration() {
758    return conf;
759  }
760
761  @Override
762  public void abort(String why, Throwable e) {
763    if (this.abortable != null) {
764      this.abortable.abort(why, e);
765    } else {
766      this.aborted = true;
767    }
768  }
769
770  @Override
771  public boolean isAborted() {
772    return this.abortable == null ? this.aborted : this.abortable.isAborted();
773  }
774}