View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.CopyOnWriteArrayList;
28  import java.util.concurrent.CountDownLatch;
29  import java.util.regex.Matcher;
30  import java.util.regex.Pattern;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.AuthUtil;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.security.Superusers;
43  import org.apache.hadoop.security.UserGroupInformation;
44  import org.apache.zookeeper.KeeperException;
45  import org.apache.zookeeper.WatchedEvent;
46  import org.apache.zookeeper.Watcher;
47  import org.apache.zookeeper.ZooDefs;
48  import org.apache.zookeeper.ZooDefs.Ids;
49  import org.apache.zookeeper.ZooDefs.Perms;
50  import org.apache.zookeeper.data.ACL;
51  import org.apache.zookeeper.data.Id;
52  import org.apache.zookeeper.data.Stat;
53  
54  /**
55   * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
56   * for each Master, RegionServer, and client process.
57   *
58   * <p>This is the only class that implements {@link Watcher}.  Other internal
59   * classes which need to be notified of ZooKeeper events must register with
60   * the local instance of this watcher via {@link #registerListener}.
61   *
62   * <p>This class also holds and manages the connection to ZooKeeper.  Code to
63   * deal with connection related events and exceptions are handled here.
64   */
65  @InterfaceAudience.Private
66  public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
67    private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
68  
69    // Identifier for this watcher (for logging only).  It is made of the prefix
70    // passed on construction and the zookeeper sessionid.
71    private String prefix;
72    private String identifier;
73  
74    // zookeeper quorum
75    private String quorum;
76  
77    // zookeeper connection
78    private RecoverableZooKeeper recoverableZooKeeper;
79  
80    // abortable in case of zk failure
81    protected Abortable abortable;
82    // Used if abortable is null
83    private boolean aborted = false;
84  
85    // listeners to be notified
86    private final List<ZooKeeperListener> listeners =
87      new CopyOnWriteArrayList<ZooKeeperListener>();
88  
89    // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
90    // negotiation to complete
91    public CountDownLatch saslLatch = new CountDownLatch(1);
92  
93    // node names
94  
95    // base znode for this cluster
96    public String baseZNode;
97    //znodes containing the locations of the servers hosting the meta replicas
98    private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
99    // znode containing ephemeral nodes of the regionservers
100   public String rsZNode;
101   // znode containing ephemeral nodes of the draining regionservers
102   public String drainingZNode;
103   // znode of currently active master
104   private String masterAddressZNode;
105   // znode of this master in backup master directory, if not the active master
106   public String backupMasterAddressesZNode;
107   // znode containing the current cluster state
108   public String clusterStateZNode;
109   // znode used for region transitioning and assignment
110   public String assignmentZNode;
111   // znode used for table disabling/enabling
112   public String tableZNode;
113   // znode containing the unique cluster ID
114   public String clusterIdZNode;
115   // znode used for log splitting work assignment
116   public String splitLogZNode;
117   // znode containing the state of the load balancer
118   public String balancerZNode;
119   // znode containing the state of region normalizer
120   private String regionNormalizerZNode;
121   // znode containing the lock for the tables
122   public String tableLockZNode;
123   // znode containing the state of recovering regions
124   public String recoveringRegionsZNode;
125   // znode containing namespace descriptors
126   public static String namespaceZNode = "namespace";
127 
128   // Certain ZooKeeper nodes need to be world-readable
129   public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
130     new ArrayList<ACL>() { {
131       add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
132       add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
133     }};
134 
135   public final static String META_ZNODE_PREFIX = "meta-region-server";
136 
137   private final Configuration conf;
138 
139   private final Exception constructorCaller;
140 
141   /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
142   private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
143 
144   /**
145    * Instantiate a ZooKeeper connection and watcher.
146    * @param identifier string that is passed to RecoverableZookeeper to be used as
147    * identifier for this instance. Use null for default.
148    * @throws IOException
149    * @throws ZooKeeperConnectionException
150    */
151   public ZooKeeperWatcher(Configuration conf, String identifier,
152       Abortable abortable) throws ZooKeeperConnectionException, IOException {
153     this(conf, identifier, abortable, false);
154   }
155 
156   /**
157    * Instantiate a ZooKeeper connection and watcher.
158    * @param conf
159    * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
160    *          this instance. Use null for default.
161    * @param abortable Can be null if there is on error there is no host to abort: e.g. client
162    *          context.
163    * @param canCreateBaseZNode
164    * @throws IOException
165    * @throws ZooKeeperConnectionException
166    */
167   public ZooKeeperWatcher(Configuration conf, String identifier,
168       Abortable abortable, boolean canCreateBaseZNode)
169   throws IOException, ZooKeeperConnectionException {
170     this.conf = conf;
171     // Capture a stack trace now.  Will print it out later if problem so we can
172     // distingush amongst the myriad ZKWs.
173     try {
174       throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
175     } catch (Exception e) {
176       this.constructorCaller = e;
177     }
178     this.quorum = ZKConfig.getZKQuorumServersString(conf);
179     this.prefix = identifier;
180     // Identifier will get the sessionid appended later below down when we
181     // handle the syncconnect event.
182     this.identifier = identifier + "0x0";
183     this.abortable = abortable;
184     setNodeNames(conf);
185     this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
186     if (canCreateBaseZNode) {
187       createBaseZNodes();
188     }
189   }
190 
191   private void createBaseZNodes() throws ZooKeeperConnectionException {
192     try {
193       // Create all the necessary "directories" of znodes
194       ZKUtil.createWithParents(this, baseZNode);
195       if (conf.getBoolean("hbase.assignment.usezk", true)) {
196         ZKUtil.createAndFailSilent(this, assignmentZNode);
197       }
198       ZKUtil.createAndFailSilent(this, rsZNode);
199       ZKUtil.createAndFailSilent(this, drainingZNode);
200       ZKUtil.createAndFailSilent(this, tableZNode);
201       ZKUtil.createAndFailSilent(this, splitLogZNode);
202       ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
203       ZKUtil.createAndFailSilent(this, tableLockZNode);
204       ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
205     } catch (KeeperException e) {
206       throw new ZooKeeperConnectionException(
207           prefix("Unexpected KeeperException creating base node"), e);
208     }
209   }
210 
211   /** Returns whether the znode is supposed to be readable by the client
212    * and DOES NOT contain sensitive information (world readable).*/
213   public boolean isClientReadable(String node) {
214     // Developer notice: These znodes are world readable. DO NOT add more znodes here UNLESS
215     // all clients need to access this data to work. Using zk for sharing data to clients (other
216     // than service lookup case is not a recommended design pattern.
217     return
218         node.equals(baseZNode) ||
219         isAnyMetaReplicaZnode(node) ||
220         node.equals(getMasterAddressZNode()) ||
221         node.equals(clusterIdZNode)||
222         node.equals(rsZNode) ||
223         // /hbase/table and /hbase/table/foo is allowed, /hbase/table-lock is not
224         node.equals(tableZNode) ||
225         node.startsWith(tableZNode + "/");
226   }
227 
228   /**
229    * On master start, we check the znode ACLs under the root directory and set the ACLs properly
230    * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
231    * so that the existing znodes created with open permissions are now changed with restrictive
232    * perms.
233    */
234   public void checkAndSetZNodeAcls() {
235     if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
236       LOG.info("not a secure deployment, proceeding");
237       return;
238     }
239 
240     // Check the base znodes permission first. Only do the recursion if base znode's perms are not
241     // correct.
242     try {
243       List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
244 
245       if (!isBaseZnodeAclSetup(actualAcls)) {
246         LOG.info("setting znode ACLs");
247         setZnodeAclsRecursive(baseZNode);
248       }
249     } catch(KeeperException.NoNodeException nne) {
250       return;
251     } catch(InterruptedException ie) {
252       interruptedExceptionNoThrow(ie, false);
253     } catch (IOException|KeeperException e) {
254       LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
255     }
256   }
257 
258   /**
259    * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
260    * will be set last in case the master fails in between.
261    * @param znode
262    */
263   private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
264     List<String> children = recoverableZooKeeper.getChildren(znode, false);
265 
266     for (String child : children) {
267       setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
268     }
269     List<ACL> acls = ZKUtil.createACL(this, znode, true);
270     LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
271     recoverableZooKeeper.setAcl(znode, acls, -1);
272   }
273 
274   /**
275    * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
276    * @param acls acls from zookeeper
277    * @return whether ACLs are set for the base znode
278    * @throws IOException
279    */
280   private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
281     if (LOG.isDebugEnabled()) {
282       LOG.debug("Checking znode ACLs");
283     }
284     String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
285     // Check whether ACL set for all superusers
286     if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
287       return false;
288     }
289 
290     // this assumes that current authenticated user is the same as zookeeper client user
291     // configured via JAAS
292     String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
293 
294     if (acls.isEmpty()) {
295       if (LOG.isDebugEnabled()) {
296         LOG.debug("ACL is empty");
297       }
298       return false;
299     }
300 
301     for (ACL acl : acls) {
302       int perms = acl.getPerms();
303       Id id = acl.getId();
304       // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
305       // and one for the hbase user
306       if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
307         if (perms != Perms.READ) {
308           if (LOG.isDebugEnabled()) {
309             LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
310               id, perms, Perms.READ));
311           }
312           return false;
313         }
314       } else if (superUsers != null && isSuperUserId(superUsers, id)) {
315         if (perms != Perms.ALL) {
316           if (LOG.isDebugEnabled()) {
317             LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
318               id, perms, Perms.ALL));
319           }
320           return false;
321         }
322       } else if ("sasl".equals(id.getScheme())) {
323         String name = id.getId();
324         // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
325         Matcher match = NAME_PATTERN.matcher(name);
326         if (match.matches()) {
327           name = match.group(1);
328         }
329         if (name.equals(hbaseUser)) {
330           if (perms != Perms.ALL) {
331             if (LOG.isDebugEnabled()) {
332               LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
333                 id, perms, Perms.ALL));
334             }
335             return false;
336           }
337         } else {
338           if (LOG.isDebugEnabled()) {
339             LOG.debug("Unexpected shortname in SASL ACL: " + id);
340           }
341           return false;
342         }
343       } else {
344         if (LOG.isDebugEnabled()) {
345           LOG.debug("unexpected ACL id '" + id + "'");
346         }
347         return false;
348       }
349     }
350     return true;
351   }
352   
353   /*
354    * Validate whether ACL set for all superusers.
355    */
356   private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
357     for (String user : superUsers) {
358       boolean hasAccess = false;
359       // TODO: Validate super group members also when ZK supports setting node ACL for groups.
360       if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
361         for (ACL acl : acls) {
362           if (user.equals(acl.getId().getId())) {
363             if (acl.getPerms() == Perms.ALL) {
364               hasAccess = true;
365             } else {
366               if (LOG.isDebugEnabled()) {
367                 LOG.debug(String.format(
368                   "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
369                   acl.getId().getId(), acl.getPerms(), Perms.ALL));
370               }
371             }
372             break;
373           }
374         }
375         if (!hasAccess) {
376           return false;
377         }
378       }
379     }
380     return true;
381   }
382   
383   /*
384    * Validate whether ACL ID is superuser.
385    */
386   public static boolean isSuperUserId(String[] superUsers, Id id) {
387     for (String user : superUsers) {
388       // TODO: Validate super group members also when ZK supports setting node ACL for groups.
389       if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
390         return true;
391       }
392     }
393     return false;
394   }
395 
396   @Override
397   public String toString() {
398     return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
399   }
400 
401   /**
402    * Adds this instance's identifier as a prefix to the passed <code>str</code>
403    * @param str String to amend.
404    * @return A new string with this instance's identifier as prefix: e.g.
405    * if passed 'hello world', the returned string could be
406    */
407   public String prefix(final String str) {
408     return this.toString() + " " + str;
409   }
410 
411   /**
412    * Set the local variable node names using the specified configuration.
413    */
414   private void setNodeNames(Configuration conf) {
415     baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
416         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
417     metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
418            conf.get("zookeeper.znode.metaserver", "meta-region-server")));
419     int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
420             HConstants.DEFAULT_META_REPLICA_NUM);
421     for (int i = 1; i < numMetaReplicas; i++) {
422       String str = ZKUtil.joinZNode(baseZNode,
423         conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
424       metaReplicaZnodes.put(i, str);
425     }
426     rsZNode = ZKUtil.joinZNode(baseZNode,
427         conf.get("zookeeper.znode.rs", "rs"));
428     drainingZNode = ZKUtil.joinZNode(baseZNode,
429         conf.get("zookeeper.znode.draining.rs", "draining"));
430     masterAddressZNode = ZKUtil.joinZNode(baseZNode,
431         conf.get("zookeeper.znode.master", "master"));
432     backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
433         conf.get("zookeeper.znode.backup.masters", "backup-masters"));
434     clusterStateZNode = ZKUtil.joinZNode(baseZNode,
435         conf.get("zookeeper.znode.state", "running"));
436     assignmentZNode = ZKUtil.joinZNode(baseZNode,
437         conf.get("zookeeper.znode.unassigned", "region-in-transition"));
438     tableZNode = ZKUtil.joinZNode(baseZNode,
439         conf.get("zookeeper.znode.tableEnableDisable", "table"));
440     clusterIdZNode = ZKUtil.joinZNode(baseZNode,
441         conf.get("zookeeper.znode.clusterId", "hbaseid"));
442     splitLogZNode = ZKUtil.joinZNode(baseZNode,
443         conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
444     balancerZNode = ZKUtil.joinZNode(baseZNode,
445         conf.get("zookeeper.znode.balancer", "balancer"));
446     regionNormalizerZNode = ZKUtil.joinZNode(baseZNode,
447       conf.get("zookeeper.znode.regionNormalizer", "normalizer"));
448     tableLockZNode = ZKUtil.joinZNode(baseZNode,
449         conf.get("zookeeper.znode.tableLock", "table-lock"));
450     recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
451         conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
452     namespaceZNode = ZKUtil.joinZNode(baseZNode,
453         conf.get("zookeeper.znode.namespace", "namespace"));
454   }
455 
456   /**
457    * Is the znode of any meta replica
458    * @param node
459    * @return true or false
460    */
461   public boolean isAnyMetaReplicaZnode(String node) {
462     if (metaReplicaZnodes.values().contains(node)) {
463       return true;
464     }
465     return false;
466   }
467 
468   /**
469    * Is it the default meta replica's znode
470    * @param node
471    * @return true or false
472    */
473   public boolean isDefaultMetaReplicaZnode(String node) {
474     if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
475       return true;
476     }
477     return false;
478   }
479 
480   /**
481    * Get the znodes corresponding to the meta replicas from ZK
482    * @return list of znodes
483    * @throws KeeperException
484    */
485   public List<String> getMetaReplicaNodes() throws KeeperException {
486     List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
487     List<String> metaReplicaNodes = new ArrayList<String>(2);
488     if (childrenOfBaseNode != null) {
489       String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
490       for (String child : childrenOfBaseNode) {
491         if (child.startsWith(pattern)) metaReplicaNodes.add(child);
492       }
493     }
494     return metaReplicaNodes;
495   }
496 
497   /**
498    * Get the znode string corresponding to a replicaId
499    * @param replicaId
500    * @return znode
501    */
502   public String getZNodeForReplica(int replicaId) {
503     String str = metaReplicaZnodes.get(replicaId);
504     // return a newly created path but don't update the cache of paths
505     // This is mostly needed for tests that attempt to create meta replicas
506     // from outside the master
507     if (str == null) {
508       str = ZKUtil.joinZNode(baseZNode,
509           conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
510     }
511     return str;
512   }
513 
514   /**
515    * Parse the meta replicaId from the passed znode
516    * @param znode
517    * @return replicaId
518    */
519   public int getMetaReplicaIdFromZnode(String znode) {
520     String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
521     if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
522     // the non-default replicas are of the pattern meta-region-server-<replicaId>
523     String nonDefaultPattern = pattern + "-";
524     return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
525   }
526 
527   /**
528    * Register the specified listener to receive ZooKeeper events.
529    * @param listener
530    */
531   public void registerListener(ZooKeeperListener listener) {
532     listeners.add(listener);
533   }
534 
535   /**
536    * Register the specified listener to receive ZooKeeper events and add it as
537    * the first in the list of current listeners.
538    * @param listener
539    */
540   public void registerListenerFirst(ZooKeeperListener listener) {
541     listeners.add(0, listener);
542   }
543 
544   public void unregisterListener(ZooKeeperListener listener) {
545     listeners.remove(listener);
546   }
547 
548   /**
549    * Clean all existing listeners
550    */
551   public void unregisterAllListeners() {
552     listeners.clear();
553   }
554 
555   /**
556    * Get a copy of current registered listeners
557    */
558   public List<ZooKeeperListener> getListeners() {
559     return new ArrayList<ZooKeeperListener>(listeners);
560   }
561 
562   /**
563    * @return The number of currently registered listeners
564    */
565   public int getNumberOfListeners() {
566     return listeners.size();
567   }
568 
569   /**
570    * Get the connection to ZooKeeper.
571    * @return connection reference to zookeeper
572    */
573   public RecoverableZooKeeper getRecoverableZooKeeper() {
574     return recoverableZooKeeper;
575   }
576 
577   public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
578     recoverableZooKeeper.reconnectAfterExpiration();
579   }
580 
581   /**
582    * Get the quorum address of this instance.
583    * @return quorum string of this zookeeper connection instance
584    */
585   public String getQuorum() {
586     return quorum;
587   }
588 
589   /**
590    * @return the base znode of this zookeeper connection instance.
591    */
592   public String getBaseZNode() {
593     return baseZNode;
594   }
595 
596   /**
597    * Method called from ZooKeeper for events and connection status.
598    * <p>
599    * Valid events are passed along to listeners.  Connection status changes
600    * are dealt with locally.
601    */
602   @Override
603   public void process(WatchedEvent event) {
604     LOG.debug(prefix("Received ZooKeeper Event, " +
605         "type=" + event.getType() + ", " +
606         "state=" + event.getState() + ", " +
607         "path=" + event.getPath()));
608 
609     switch(event.getType()) {
610 
611       // If event type is NONE, this is a connection status change
612       case None: {
613         connectionEvent(event);
614         break;
615       }
616 
617       // Otherwise pass along to the listeners
618 
619       case NodeCreated: {
620         for(ZooKeeperListener listener : listeners) {
621           listener.nodeCreated(event.getPath());
622         }
623         break;
624       }
625 
626       case NodeDeleted: {
627         for(ZooKeeperListener listener : listeners) {
628           listener.nodeDeleted(event.getPath());
629         }
630         break;
631       }
632 
633       case NodeDataChanged: {
634         for(ZooKeeperListener listener : listeners) {
635           listener.nodeDataChanged(event.getPath());
636         }
637         break;
638       }
639 
640       case NodeChildrenChanged: {
641         for(ZooKeeperListener listener : listeners) {
642           listener.nodeChildrenChanged(event.getPath());
643         }
644         break;
645       }
646     }
647   }
648 
649   // Connection management
650 
651   /**
652    * Called when there is a connection-related event via the Watcher callback.
653    * <p>
654    * If Disconnected or Expired, this should shutdown the cluster. But, since
655    * we send a KeeperException.SessionExpiredException along with the abort
656    * call, it's possible for the Abortable to catch it and try to create a new
657    * session with ZooKeeper. This is what the client does in HCM.
658    * <p>
659    * @param event
660    */
661   private void connectionEvent(WatchedEvent event) {
662     switch(event.getState()) {
663       case SyncConnected:
664         // Now, this callback can be invoked before the this.zookeeper is set.
665         // Wait a little while.
666         long finished = System.currentTimeMillis() +
667           this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
668         while (System.currentTimeMillis() < finished) {
669           try {
670             Thread.sleep(1);
671           } catch (InterruptedException e) {
672             LOG.warn("Interrupted while sleeping");
673             throw new RuntimeException("Interrupted while waiting for" +
674                 " recoverableZooKeeper is set");
675           }
676           if (this.recoverableZooKeeper != null) break;
677         }
678 
679         if (this.recoverableZooKeeper == null) {
680           LOG.error("ZK is null on connection event -- see stack trace " +
681             "for the stack trace when constructor was called on this zkw",
682             this.constructorCaller);
683           throw new NullPointerException("ZK is null");
684         }
685         this.identifier = this.prefix + "-0x" +
686           Long.toHexString(this.recoverableZooKeeper.getSessionId());
687         // Update our identifier.  Otherwise ignore.
688         LOG.debug(this.identifier + " connected");
689         break;
690 
691       // Abort the server if Disconnected or Expired
692       case Disconnected:
693         LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
694         break;
695 
696       case Expired:
697         String msg = prefix(this.identifier + " received expired from " +
698           "ZooKeeper, aborting");
699         // TODO: One thought is to add call to ZooKeeperListener so say,
700         // ZooKeeperNodeTracker can zero out its data values.
701         if (this.abortable != null) {
702           this.abortable.abort(msg, new KeeperException.SessionExpiredException());
703         }
704         break;
705 
706       case ConnectedReadOnly:
707       case SaslAuthenticated:
708       case AuthFailed:
709         break;
710 
711       default:
712         throw new IllegalStateException("Received event is not valid: " + event.getState());
713     }
714   }
715 
716   /**
717    * Forces a synchronization of this ZooKeeper client connection.
718    * <p>
719    * Executing this method before running other methods will ensure that the
720    * subsequent operations are up-to-date and consistent as of the time that
721    * the sync is complete.
722    * <p>
723    * This is used for compareAndSwap type operations where we need to read the
724    * data of an existing node and delete or transition that node, utilizing the
725    * previously read version and data.  We want to ensure that the version read
726    * is up-to-date from when we begin the operation.
727    */
728   public void sync(String path) throws KeeperException {
729     this.recoverableZooKeeper.sync(path, null, null);
730   }
731 
732   /**
733    * Handles KeeperExceptions in client calls.
734    * <p>
735    * This may be temporary but for now this gives one place to deal with these.
736    * <p>
737    * TODO: Currently this method rethrows the exception to let the caller handle
738    * <p>
739    * @param ke
740    * @throws KeeperException
741    */
742   public void keeperException(KeeperException ke)
743   throws KeeperException {
744     LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
745     throw ke;
746   }
747 
748   /**
749    * Handles InterruptedExceptions in client calls.
750    * @param ie the InterruptedException instance thrown
751    * @throws KeeperException the exception to throw, transformed from the InterruptedException
752    */
753   public void interruptedException(InterruptedException ie) throws KeeperException {
754     interruptedExceptionNoThrow(ie, true);
755     // Throw a system error exception to let upper level handle it
756     throw new KeeperException.SystemErrorException();
757   }
758 
759   /**
760    * Log the InterruptedException and interrupt current thread
761    * @param ie The IterruptedException to log
762    * @param throwLater Whether we will throw the exception latter
763    */
764   public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
765     LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
766         + (throwLater ? " and rethrow a SystemErrorException" : "")),
767       ie);
768     // At least preserver interrupt.
769     Thread.currentThread().interrupt();
770   }
771 
772   /**
773    * Close the connection to ZooKeeper.
774    *
775    */
776   @Override
777   public void close() {
778     try {
779       if (recoverableZooKeeper != null) {
780         recoverableZooKeeper.close();
781       }
782     } catch (InterruptedException e) {
783       Thread.currentThread().interrupt();
784     }
785   }
786 
787   public Configuration getConfiguration() {
788     return conf;
789   }
790 
791   @Override
792   public void abort(String why, Throwable e) {
793     if (this.abortable != null) this.abortable.abort(why, e);
794     else this.aborted = true;
795   }
796 
797   @Override
798   public boolean isAborted() {
799     return this.abortable == null? this.aborted: this.abortable.isAborted();
800   }
801 
802   /**
803    * @return Path to the currently active master.
804    */
805   public String getMasterAddressZNode() {
806     return this.masterAddressZNode;
807   }
808 
809   /**
810    * @return ZooKeeper znode for region normalizer state
811    */
812   public String getRegionNormalizerZNode() {
813     return regionNormalizerZNode;
814   }
815 }