001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.AuthUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ZooKeeperConnectionException;
036import org.apache.hadoop.hbase.security.Superusers;
037import org.apache.hadoop.hbase.trace.TraceUtil;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.zookeeper.KeeperException;
043import org.apache.zookeeper.WatchedEvent;
044import org.apache.zookeeper.Watcher;
045import org.apache.zookeeper.ZooDefs.Ids;
046import org.apache.zookeeper.ZooDefs.Perms;
047import org.apache.zookeeper.data.ACL;
048import org.apache.zookeeper.data.Id;
049import org.apache.zookeeper.data.Stat;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055/**
056 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated for each Master,
057 * RegionServer, and client process.
058 * <p>
059 * This is the only class that implements {@link Watcher}. Other internal classes which need to be
060 * notified of ZooKeeper events must register with the local instance of this watcher via
061 * {@link #registerListener}.
062 * <p>
063 * This class also holds and manages the connection to ZooKeeper. Code to deal with connection
064 * related events and exceptions are handled here.
065 */
066@InterfaceAudience.Private
067public class ZKWatcher implements Watcher, Abortable, Closeable {
068  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
069
070  // Identifier for this watcher (for logging only). It is made of the prefix
071  // passed on construction and the zookeeper sessionid.
072  private final String prefix;
073  private String identifier;
074
075  // zookeeper quorum
076  private final String quorum;
077
078  // zookeeper connection
079  private final RecoverableZooKeeper recoverableZooKeeper;
080
081  // abortable in case of zk failure
082  protected Abortable abortable;
083  // Used if abortable is null
084  private boolean aborted = false;
085
086  private final ZNodePaths znodePaths;
087
088  // listeners to be notified
089  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
090
091  // Single threaded executor pool that processes event notifications from Zookeeper. Events are
092  // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do
093  // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context.
094  // EventThread internally runs a single while loop to serially process all the events. When events
095  // are processed by the listeners in the same thread, that blocks the EventThread from processing
096  // subsequent events. Processing events in a separate thread frees up the event thread to continue
097  // and further prevents deadlocks if the process method itself makes other zookeeper calls.
098  // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
099  // requests using a single while loop and hence there is no performance degradation.
100  private final ExecutorService zkEventProcessor = Executors
101    .newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d")
102      .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
103
104  private final Configuration conf;
105
106  private final long zkSyncTimeout;
107
108  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
109  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
110
111  /**
112   * Instantiate a ZooKeeper connection and watcher.
113   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
114   *                   this instance. Use null for default.
115   * @throws IOException                  if the connection to ZooKeeper fails
116   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
117   */
118  public ZKWatcher(Configuration conf, String identifier, Abortable abortable)
119    throws ZooKeeperConnectionException, IOException {
120    this(conf, identifier, abortable, false);
121  }
122
123  /**
124   * Instantiate a ZooKeeper connection and watcher.
125   * @param conf               the configuration to use
126   * @param identifier         string that is passed to RecoverableZookeeper to be used as
127   *                           identifier for this instance. Use null for default.
128   * @param abortable          Can be null if there is on error there is no host to abort: e.g.
129   *                           client context.
130   * @param canCreateBaseZNode true if a base ZNode can be created
131   * @throws IOException                  if the connection to ZooKeeper fails
132   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
133   */
134  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
135    boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException {
136    this(conf, identifier, abortable, canCreateBaseZNode, false);
137  }
138
139  /**
140   * Instantiate a ZooKeeper connection and watcher.
141   * @param conf               the configuration to use
142   * @param identifier         string that is passed to RecoverableZookeeper to be used as
143   *                           identifier for this instance. Use null for default.
144   * @param abortable          Can be null if there is on error there is no host to abort: e.g.
145   *                           client context.
146   * @param canCreateBaseZNode true if a base ZNode can be created
147   * @param clientZK           whether this watcher is set to access client ZK
148   * @throws IOException                  if the connection to ZooKeeper fails
149   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
150   *                                      ZNodes
151   */
152  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
153    boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException {
154    this.conf = conf;
155    if (clientZK) {
156      String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
157      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
158      if (clientZkQuorumServers != null) {
159        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
160          // Don't allow same settings to avoid dead loop when master trying
161          // to sync meta information from server ZK to client ZK
162          throw new IllegalArgumentException(
163            "The quorum settings for client ZK should be different from those for server");
164        }
165        this.quorum = clientZkQuorumServers;
166      } else {
167        this.quorum = serverZkQuorumServers;
168      }
169    } else {
170      this.quorum = ZKConfig.getZKQuorumServersString(conf);
171    }
172    this.prefix = identifier;
173    // Identifier will get the sessionid appended later below down when we
174    // handle the syncconnect event.
175    this.identifier = identifier + "0x0";
176    this.abortable = abortable;
177    this.znodePaths = new ZNodePaths(conf);
178    PendingWatcher pendingWatcher = new PendingWatcher();
179    this.recoverableZooKeeper = RecoverableZooKeeper.connect(conf, quorum, pendingWatcher,
180      identifier, ZKConfig.getZKClientConfig(conf));
181    pendingWatcher.prepare(this);
182    if (canCreateBaseZNode) {
183      try {
184        createBaseZNodes();
185      } catch (ZooKeeperConnectionException zce) {
186        try {
187          this.recoverableZooKeeper.close();
188        } catch (InterruptedException ie) {
189          LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper);
190          Thread.currentThread().interrupt();
191        }
192        throw zce;
193      }
194    }
195    this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS,
196      HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
197  }
198
199  public List<ACL> createACL(String node) {
200    return createACL(node, ZKAuthentication.isSecureZooKeeper(getConfiguration()));
201  }
202
203  public List<ACL> createACL(String node, boolean isSecureZooKeeper) {
204    if (!node.startsWith(getZNodePaths().baseZNode)) {
205      return Ids.OPEN_ACL_UNSAFE;
206    }
207    if (isSecureZooKeeper) {
208      ArrayList<ACL> acls = new ArrayList<>();
209      // add permission to hbase supper user
210      String[] superUsers = getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
211      String hbaseUser = null;
212      try {
213        hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
214      } catch (IOException e) {
215        LOG.debug("Could not acquire current User.", e);
216      }
217      if (superUsers != null) {
218        List<String> groups = new ArrayList<>();
219        for (String user : superUsers) {
220          if (AuthUtil.isGroupPrincipal(user)) {
221            // TODO: Set node ACL for groups when ZK supports this feature
222            groups.add(user);
223          } else {
224            if (!user.equals(hbaseUser)) {
225              acls.add(new ACL(Perms.ALL, new Id("sasl", user)));
226            }
227          }
228        }
229        if (!groups.isEmpty()) {
230          LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this "
231            + "feature presently.", groups);
232        }
233      }
234      // Certain znodes are accessed directly by the client,
235      // so they must be readable by non-authenticated clients
236      if (getZNodePaths().isClientReadable(node)) {
237        acls.addAll(Ids.CREATOR_ALL_ACL);
238        acls.addAll(Ids.READ_ACL_UNSAFE);
239      } else {
240        acls.addAll(Ids.CREATOR_ALL_ACL);
241      }
242      return acls;
243    } else {
244      return Ids.OPEN_ACL_UNSAFE;
245    }
246  }
247
248  private void createBaseZNodes() throws ZooKeeperConnectionException {
249    try {
250      // Create all the necessary "directories" of znodes
251      ZKUtil.createWithParents(this, znodePaths.baseZNode);
252      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
253      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
254      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
255      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
256      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
257      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
258    } catch (KeeperException e) {
259      throw new ZooKeeperConnectionException(
260        prefix("Unexpected KeeperException creating base node"), e);
261    }
262  }
263
264  /**
265   * On master start, we check the znode ACLs under the root directory and set the ACLs properly if
266   * needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed so
267   * that the existing znodes created with open permissions are now changed with restrictive perms.
268   */
269  public void checkAndSetZNodeAcls() {
270    if (!ZKAuthentication.isSecureZooKeeper(getConfiguration())) {
271      LOG.info("not a secure deployment, proceeding");
272      return;
273    }
274
275    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
276    // correct.
277    try {
278      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
279
280      if (!isBaseZnodeAclSetup(actualAcls)) {
281        LOG.info("setting znode ACLs");
282        setZnodeAclsRecursive(znodePaths.baseZNode);
283      }
284    } catch (KeeperException.NoNodeException nne) {
285      return;
286    } catch (InterruptedException ie) {
287      interruptedExceptionNoThrow(ie, false);
288    } catch (IOException | KeeperException e) {
289      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
290    }
291  }
292
293  /**
294   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs will
295   * be set last in case the master fails in between.
296   * @param znode the ZNode to set the permissions for
297   */
298  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
299    List<String> children = recoverableZooKeeper.getChildren(znode, false);
300
301    for (String child : children) {
302      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
303    }
304    List<ACL> acls = createACL(znode, true);
305    LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
306    recoverableZooKeeper.setAcl(znode, acls, -1);
307  }
308
309  /**
310   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
311   * @param acls acls from zookeeper
312   * @return whether ACLs are set for the base znode
313   * @throws IOException if getting the current user fails
314   */
315  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
316    if (LOG.isDebugEnabled()) {
317      LOG.debug("Checking znode ACLs");
318    }
319    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
320    // Check whether ACL set for all superusers
321    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
322      return false;
323    }
324
325    // this assumes that current authenticated user is the same as zookeeper client user
326    // configured via JAAS
327    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
328
329    if (acls.isEmpty()) {
330      if (LOG.isDebugEnabled()) {
331        LOG.debug("ACL is empty");
332      }
333      return false;
334    }
335
336    for (ACL acl : acls) {
337      int perms = acl.getPerms();
338      Id id = acl.getId();
339      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
340      // and one for the hbase user
341      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
342        if (perms != Perms.READ) {
343          if (LOG.isDebugEnabled()) {
344            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
345              id, perms, Perms.READ));
346          }
347          return false;
348        }
349      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
350        if (perms != Perms.ALL) {
351          if (LOG.isDebugEnabled()) {
352            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
353              id, perms, Perms.ALL));
354          }
355          return false;
356        }
357      } else if ("sasl".equals(id.getScheme())) {
358        String name = id.getId();
359        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
360        Matcher match = NAME_PATTERN.matcher(name);
361        if (match.matches()) {
362          name = match.group(1);
363        }
364        if (name.equals(hbaseUser)) {
365          if (perms != Perms.ALL) {
366            if (LOG.isDebugEnabled()) {
367              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
368                id, perms, Perms.ALL));
369            }
370            return false;
371          }
372        } else {
373          if (LOG.isDebugEnabled()) {
374            LOG.debug("Unexpected shortname in SASL ACL: {}", id);
375          }
376          return false;
377        }
378      } else {
379        if (LOG.isDebugEnabled()) {
380          LOG.debug("unexpected ACL id '{}'", id);
381        }
382        return false;
383      }
384    }
385    return true;
386  }
387
388  /*
389   * Validate whether ACL set for all superusers.
390   */
391  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
392    for (String user : superUsers) {
393      boolean hasAccess = false;
394      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
395      if (!AuthUtil.isGroupPrincipal(user)) {
396        for (ACL acl : acls) {
397          if (user.equals(acl.getId().getId())) {
398            if (acl.getPerms() == Perms.ALL) {
399              hasAccess = true;
400            } else {
401              if (LOG.isDebugEnabled()) {
402                LOG.debug(String.format(
403                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
404                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
405              }
406            }
407            break;
408          }
409        }
410        if (!hasAccess) {
411          return false;
412        }
413      }
414    }
415    return true;
416  }
417
418  /*
419   * Validate whether ACL ID is superuser.
420   */
421  public static boolean isSuperUserId(String[] superUsers, Id id) {
422    for (String user : superUsers) {
423      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
424      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
425        return true;
426      }
427    }
428    return false;
429  }
430
431  @Override
432  public String toString() {
433    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
434  }
435
436  /**
437   * Adds this instance's identifier as a prefix to the passed <code>str</code>
438   * @param str String to amend.
439   * @return A new string with this instance's identifier as prefix: e.g. if passed 'hello world',
440   *         the returned string could be
441   */
442  public String prefix(final String str) {
443    return this.toString() + " " + str;
444  }
445
446  /**
447   * Get the znodes corresponding to the meta replicas from ZK
448   * @return list of znodes
449   * @throws KeeperException if a ZooKeeper operation fails
450   */
451  public List<String> getMetaReplicaNodes() throws KeeperException {
452    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
453    return filterMetaReplicaNodes(childrenOfBaseNode);
454  }
455
456  /**
457   * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode
458   * for subsequent CREATE/DELETE operations on child nodes.
459   */
460  public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException {
461    List<String> childrenOfBaseNode =
462      ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode);
463    // Need to throw here instead of returning an empty list if the base znode hasn't been created
464    // Caller should retry in that case, versus thinking the base znode has a watcher set
465    if (childrenOfBaseNode == null) {
466      keeperException(new KeeperException.NoNodeException(znodePaths.baseZNode));
467    }
468    return filterMetaReplicaNodes(childrenOfBaseNode);
469  }
470
471  /**
472   * @param nodes Input list of znodes
473   * @return Filtered list of znodes from nodes that belong to meta replica(s).
474   */
475  private List<String> filterMetaReplicaNodes(List<String> nodes) {
476    if (nodes == null || nodes.isEmpty()) {
477      return new ArrayList<>();
478    }
479    List<String> metaReplicaNodes = new ArrayList<>(2);
480    String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX);
481    for (String child : nodes) {
482      if (child.startsWith(pattern)) {
483        metaReplicaNodes.add(child);
484      }
485    }
486    return metaReplicaNodes;
487  }
488
489  /**
490   * Register the specified listener to receive ZooKeeper events.
491   * @param listener the listener to register
492   */
493  public void registerListener(ZKListener listener) {
494    listeners.add(listener);
495  }
496
497  /**
498   * Register the specified listener to receive ZooKeeper events and add it as the first in the list
499   * of current listeners.
500   * @param listener the listener to register
501   */
502  public void registerListenerFirst(ZKListener listener) {
503    listeners.add(0, listener);
504  }
505
506  public void unregisterListener(ZKListener listener) {
507    listeners.remove(listener);
508  }
509
510  /**
511   * Clean all existing listeners
512   */
513  public void unregisterAllListeners() {
514    listeners.clear();
515  }
516
517  /**
518   * Get a copy of current registered listeners
519   */
520  public List<ZKListener> getListeners() {
521    return new ArrayList<>(listeners);
522  }
523
524  /** Returns The number of currently registered listeners */
525  public int getNumberOfListeners() {
526    return listeners.size();
527  }
528
529  /**
530   * Get the connection to ZooKeeper.
531   * @return connection reference to zookeeper
532   */
533  public RecoverableZooKeeper getRecoverableZooKeeper() {
534    return recoverableZooKeeper;
535  }
536
537  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
538    recoverableZooKeeper.reconnectAfterExpiration();
539  }
540
541  /**
542   * Get the quorum address of this instance.
543   * @return quorum string of this zookeeper connection instance
544   */
545  public String getQuorum() {
546    return quorum;
547  }
548
549  /**
550   * Get the znodePaths.
551   * <p>
552   * Mainly used for mocking as mockito can not mock a field access.
553   */
554  public ZNodePaths getZNodePaths() {
555    return znodePaths;
556  }
557
558  private void processEvent(WatchedEvent event) {
559    TraceUtil.trace(() -> {
560      switch (event.getType()) {
561        // If event type is NONE, this is a connection status change
562        case None: {
563          connectionEvent(event);
564          break;
565        }
566
567        // Otherwise pass along to the listeners
568        case NodeCreated: {
569          for (ZKListener listener : listeners) {
570            listener.nodeCreated(event.getPath());
571          }
572          break;
573        }
574
575        case NodeDeleted: {
576          for (ZKListener listener : listeners) {
577            listener.nodeDeleted(event.getPath());
578          }
579          break;
580        }
581
582        case NodeDataChanged: {
583          for (ZKListener listener : listeners) {
584            listener.nodeDataChanged(event.getPath());
585          }
586          break;
587        }
588
589        case NodeChildrenChanged: {
590          for (ZKListener listener : listeners) {
591            listener.nodeChildrenChanged(event.getPath());
592          }
593          break;
594        }
595        default:
596          LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(),
597            event.getPath());
598      }
599    }, "ZKWatcher.processEvent: " + event.getType() + " " + event.getPath());
600  }
601
602  /**
603   * Method called from ZooKeeper for events and connection status.
604   * <p>
605   * Valid events are passed along to listeners. Connection status changes are dealt with locally.
606   */
607  @Override
608  public void process(WatchedEvent event) {
609    LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state="
610      + event.getState() + ", " + "path=" + event.getPath()));
611    final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier;
612    if (!zkEventProcessor.isShutdown()) {
613      zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> processEvent(event), spanName));
614    }
615  }
616
617  // Connection management
618
619  /**
620   * Called when there is a connection-related event via the Watcher callback.
621   * <p>
622   * If Disconnected or Expired, this should shutdown the cluster. But, since we send a
623   * KeeperException.SessionExpiredException along with the abort call, it's possible for the
624   * Abortable to catch it and try to create a new session with ZooKeeper. This is what the client
625   * does in HCM.
626   * <p>
627   * @param event the connection-related event
628   */
629  private void connectionEvent(WatchedEvent event) {
630    switch (event.getState()) {
631      case SyncConnected:
632        this.identifier =
633          this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId());
634        // Update our identifier. Otherwise ignore.
635        LOG.debug("{} connected", this.identifier);
636        break;
637
638      // Abort the server if Disconnected or Expired
639      case Disconnected:
640        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
641        break;
642
643      case Closed:
644        LOG.debug(prefix("ZooKeeper client closed, ignoring"));
645        break;
646
647      case Expired:
648        String msg = prefix(this.identifier + " received expired from " + "ZooKeeper, aborting");
649        // TODO: One thought is to add call to ZKListener so say,
650        // ZKNodeTracker can zero out its data values.
651        if (this.abortable != null) {
652          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
653        }
654        break;
655
656      case ConnectedReadOnly:
657      case SaslAuthenticated:
658      case AuthFailed:
659        break;
660
661      default:
662        throw new IllegalStateException("Received event is not valid: " + event.getState());
663    }
664  }
665
666  /**
667   * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a
668   * timeout lets the callers fail-fast rather than wait forever for the sync to finish.
669   * <p>
670   * Executing this method before running other methods will ensure that the subsequent operations
671   * are up-to-date and consistent as of the time that the sync is complete.
672   * <p>
673   * This is used for compareAndSwap type operations where we need to read the data of an existing
674   * node and delete or transition that node, utilizing the previously read version and data. We
675   * want to ensure that the version read is up-to-date from when we begin the operation.
676   * <p>
677   */
678  public void syncOrTimeout(String path) throws KeeperException {
679    final CountDownLatch latch = new CountDownLatch(1);
680    long startTime = EnvironmentEdgeManager.currentTime();
681    this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null);
682    try {
683      if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) {
684        LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points "
685          + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout);
686        throw new KeeperException.RequestTimeoutException();
687      }
688    } catch (InterruptedException e) {
689      LOG.warn("Interrupted waiting for ZK sync() to finish.", e);
690      Thread.currentThread().interrupt();
691      return;
692    }
693    if (LOG.isDebugEnabled()) {
694      // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a
695      // useful metric to have since the latency of sync() impacts the callers.
696      LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime);
697    }
698  }
699
700  /**
701   * Handles KeeperExceptions in client calls.
702   * <p>
703   * This may be temporary but for now this gives one place to deal with these.
704   * <p>
705   * TODO: Currently this method rethrows the exception to let the caller handle
706   * <p>
707   * @param ke the exception to rethrow
708   * @throws KeeperException if a ZooKeeper operation fails
709   */
710  public void keeperException(KeeperException ke) throws KeeperException {
711    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
712    throw ke;
713  }
714
715  /**
716   * Handles InterruptedExceptions in client calls.
717   * @param ie the InterruptedException instance thrown
718   * @throws KeeperException the exception to throw, transformed from the InterruptedException
719   */
720  public void interruptedException(InterruptedException ie) throws KeeperException {
721    interruptedExceptionNoThrow(ie, true);
722    // Throw a system error exception to let upper level handle it
723    KeeperException keeperException = new KeeperException.SystemErrorException();
724    keeperException.initCause(ie);
725    throw keeperException;
726  }
727
728  /**
729   * Log the InterruptedException and interrupt current thread
730   * @param ie         The IterruptedException to log
731   * @param throwLater Whether we will throw the exception latter
732   */
733  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
734    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
735      + (throwLater ? " and rethrow a SystemErrorException" : "")), ie);
736    // At least preserve interrupt.
737    Thread.currentThread().interrupt();
738  }
739
740  /**
741   * Close the connection to ZooKeeper.
742   */
743  @Override
744  public void close() {
745    zkEventProcessor.shutdown();
746    try {
747      if (!zkEventProcessor.awaitTermination(15, TimeUnit.SECONDS)) {
748        LOG.warn("ZKWatcher event processor has not finished to terminate.");
749        zkEventProcessor.shutdownNow();
750      }
751    } catch (InterruptedException e) {
752      Thread.currentThread().interrupt();
753    } finally {
754      try {
755        recoverableZooKeeper.close();
756      } catch (InterruptedException e) {
757        Thread.currentThread().interrupt();
758      }
759    }
760  }
761
762  public Configuration getConfiguration() {
763    return conf;
764  }
765
766  @Override
767  public void abort(String why, Throwable e) {
768    if (this.abortable != null) {
769      this.abortable.abort(why, e);
770    } else {
771      this.aborted = true;
772    }
773  }
774
775  @Override
776  public boolean isAborted() {
777    return this.abortable == null ? this.aborted : this.abortable.isAborted();
778  }
779}