View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.CopyOnWriteArrayList;
28  import java.util.concurrent.CountDownLatch;
29  import java.util.regex.Matcher;
30  import java.util.regex.Pattern;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Abortable;
36  import org.apache.hadoop.hbase.AuthUtil;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.security.Superusers;
42  import org.apache.hadoop.security.UserGroupInformation;
43  import org.apache.zookeeper.KeeperException;
44  import org.apache.zookeeper.WatchedEvent;
45  import org.apache.zookeeper.Watcher;
46  import org.apache.zookeeper.ZooDefs.Ids;
47  import org.apache.zookeeper.ZooDefs.Perms;
48  import org.apache.zookeeper.data.ACL;
49  import org.apache.zookeeper.data.Id;
50  import org.apache.zookeeper.data.Stat;
51  
52  /**
53   * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
54   * for each Master, RegionServer, and client process.
55   *
56   * <p>This is the only class that implements {@link Watcher}.  Other internal
57   * classes which need to be notified of ZooKeeper events must register with
58   * the local instance of this watcher via {@link #registerListener}.
59   *
60   * <p>This class also holds and manages the connection to ZooKeeper.  Code to
61   * deal with connection related events and exceptions are handled here.
62   */
63  @InterfaceAudience.Private
64  public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
65    private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
66  
67    // Identifier for this watcher (for logging only).  It is made of the prefix
68    // passed on construction and the zookeeper sessionid.
69    private String prefix;
70    private String identifier;
71  
72    // zookeeper quorum
73    private String quorum;
74  
75    // zookeeper connection
76    private RecoverableZooKeeper recoverableZooKeeper;
77  
78    // abortable in case of zk failure
79    protected Abortable abortable;
80    // Used if abortable is null
81    private boolean aborted = false;
82  
83    // listeners to be notified
84    private final List<ZooKeeperListener> listeners =
85      new CopyOnWriteArrayList<ZooKeeperListener>();
86  
87    // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
88    // negotiation to complete
89    public CountDownLatch saslLatch = new CountDownLatch(1);
90  
91    // node names
92  
93    // base znode for this cluster
94    public String baseZNode;
95    //znodes containing the locations of the servers hosting the meta replicas
96    private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
97    // znode containing ephemeral nodes of the regionservers
98    public String rsZNode;
99    // znode containing ephemeral nodes of the draining regionservers
100   public String drainingZNode;
101   // znode of currently active master
102   private String masterAddressZNode;
103   // znode of this master in backup master directory, if not the active master
104   public String backupMasterAddressesZNode;
105   // znode containing the current cluster state
106   public String clusterStateZNode;
107   // znode used for table disabling/enabling
108   @Deprecated
109   public String tableZNode;
110   // znode containing the unique cluster ID
111   public String clusterIdZNode;
112   // znode used for log splitting work assignment
113   public String splitLogZNode;
114   // znode containing the state of the load balancer
115   public String balancerZNode;
116   // znode containing the state of region normalizer
117   private String regionNormalizerZNode;
118   // znode containing the lock for the tables
119   public String tableLockZNode;
120   // znode containing the state of recovering regions
121   public String recoveringRegionsZNode;
122   // znode containing namespace descriptors
123   public static String namespaceZNode = "namespace";
124 
125   public final static String META_ZNODE_PREFIX = "meta-region-server";
126 
127   private final Configuration conf;
128 
129   private final Exception constructorCaller;
130 
131   /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
132   private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
133 
134   /**
135    * Instantiate a ZooKeeper connection and watcher.
136    * @param identifier string that is passed to RecoverableZookeeper to be used as
137    * identifier for this instance. Use null for default.
138    * @throws IOException
139    * @throws ZooKeeperConnectionException
140    */
141   public ZooKeeperWatcher(Configuration conf, String identifier,
142       Abortable abortable) throws ZooKeeperConnectionException, IOException {
143     this(conf, identifier, abortable, false);
144   }
145 
146   /**
147    * Instantiate a ZooKeeper connection and watcher.
148    * @param conf
149    * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
150    *          this instance. Use null for default.
151    * @param abortable Can be null if there is on error there is no host to abort: e.g. client
152    *          context.
153    * @param canCreateBaseZNode
154    * @throws IOException
155    * @throws ZooKeeperConnectionException
156    */
157   public ZooKeeperWatcher(Configuration conf, String identifier,
158       Abortable abortable, boolean canCreateBaseZNode)
159   throws IOException, ZooKeeperConnectionException {
160     this.conf = conf;
161     // Capture a stack trace now.  Will print it out later if problem so we can
162     // distingush amongst the myriad ZKWs.
163     try {
164       throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
165     } catch (Exception e) {
166       this.constructorCaller = e;
167     }
168     this.quorum = ZKConfig.getZKQuorumServersString(conf);
169     this.prefix = identifier;
170     // Identifier will get the sessionid appended later below down when we
171     // handle the syncconnect event.
172     this.identifier = identifier + "0x0";
173     this.abortable = abortable;
174     setNodeNames(conf);
175     this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
176     if (canCreateBaseZNode) {
177       createBaseZNodes();
178     }
179   }
180 
181   private void createBaseZNodes() throws ZooKeeperConnectionException {
182     try {
183       // Create all the necessary "directories" of znodes
184       ZKUtil.createWithParents(this, baseZNode);
185       ZKUtil.createAndFailSilent(this, rsZNode);
186       ZKUtil.createAndFailSilent(this, drainingZNode);
187       ZKUtil.createAndFailSilent(this, tableZNode);
188       ZKUtil.createAndFailSilent(this, splitLogZNode);
189       ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
190       ZKUtil.createAndFailSilent(this, tableLockZNode);
191       ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
192     } catch (KeeperException e) {
193       throw new ZooKeeperConnectionException(
194           prefix("Unexpected KeeperException creating base node"), e);
195     }
196   }
197 
198   /** Returns whether the znode is supposed to be readable by the client
199    * and DOES NOT contain sensitive information (world readable).*/
200   public boolean isClientReadable(String node) {
201     // Developer notice: These znodes are world readable. DO NOT add more znodes here UNLESS
202     // all clients need to access this data to work. Using zk for sharing data to clients (other
203     // than service lookup case is not a recommended design pattern.
204     return
205         node.equals(baseZNode) ||
206         isAnyMetaReplicaZnode(node) ||
207         node.equals(getMasterAddressZNode()) ||
208         node.equals(clusterIdZNode)||
209         node.equals(rsZNode) ||
210         // /hbase/table and /hbase/table/foo is allowed, /hbase/table-lock is not
211         node.equals(tableZNode) ||
212         node.startsWith(tableZNode + "/");
213   }
214 
215   /**
216    * On master start, we check the znode ACLs under the root directory and set the ACLs properly
217    * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
218    * so that the existing znodes created with open permissions are now changed with restrictive
219    * perms.
220    */
221   public void checkAndSetZNodeAcls() {
222     if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
223       LOG.info("not a secure deployment, proceeding");
224       return;
225     }
226 
227     // Check the base znodes permission first. Only do the recursion if base znode's perms are not
228     // correct.
229     try {
230       List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
231 
232       if (!isBaseZnodeAclSetup(actualAcls)) {
233         LOG.info("setting znode ACLs");
234         setZnodeAclsRecursive(baseZNode);
235       }
236     } catch(KeeperException.NoNodeException nne) {
237       return;
238     } catch(InterruptedException ie) {
239       interruptedException(ie);
240     } catch (IOException|KeeperException e) {
241       LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
242     }
243   }
244 
245   /**
246    * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
247    * will be set last in case the master fails in between.
248    * @param znode
249    */
250   private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
251     List<String> children = recoverableZooKeeper.getChildren(znode, false);
252 
253     for (String child : children) {
254       setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
255     }
256     List<ACL> acls = ZKUtil.createACL(this, znode, true);
257     LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
258     recoverableZooKeeper.setAcl(znode, acls, -1);
259   }
260 
261   /**
262    * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
263    * @param acls acls from zookeeper
264    * @return whether ACLs are set for the base znode
265    * @throws IOException
266    */
267   private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
268     if (LOG.isDebugEnabled()) {
269       LOG.debug("Checking znode ACLs");
270     }
271     String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
272     // Check whether ACL set for all superusers
273     if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
274       return false;
275     }
276 
277     // this assumes that current authenticated user is the same as zookeeper client user
278     // configured via JAAS
279     String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
280 
281     if (acls.isEmpty()) {
282       if (LOG.isDebugEnabled()) {
283         LOG.debug("ACL is empty");
284       }
285       return false;
286     }
287 
288     for (ACL acl : acls) {
289       int perms = acl.getPerms();
290       Id id = acl.getId();
291       // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
292       // and one for the hbase user
293       if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
294         if (perms != Perms.READ) {
295           if (LOG.isDebugEnabled()) {
296             LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
297               id, perms, Perms.READ));
298           }
299           return false;
300         }
301       } else if (superUsers != null && isSuperUserId(superUsers, id)) {
302         if (perms != Perms.ALL) {
303           if (LOG.isDebugEnabled()) {
304             LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
305               id, perms, Perms.ALL));
306           }
307           return false;
308         }
309       } else if ("sasl".equals(id.getScheme())) {
310         String name = id.getId();
311         // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
312         Matcher match = NAME_PATTERN.matcher(name);
313         if (match.matches()) {
314           name = match.group(1);
315         }
316         if (name.equals(hbaseUser)) {
317           if (perms != Perms.ALL) {
318             if (LOG.isDebugEnabled()) {
319               LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
320                 id, perms, Perms.ALL));
321             }
322             return false;
323           }
324         } else {
325           if (LOG.isDebugEnabled()) {
326             LOG.debug("Unexpected shortname in SASL ACL: " + id);
327           }
328           return false;
329         }
330       } else {
331         if (LOG.isDebugEnabled()) {
332           LOG.debug("unexpected ACL id '" + id + "'");
333         }
334         return false;
335       }
336     }
337     return true;
338   }
339 
340   /*
341    * Validate whether ACL set for all superusers.
342    */
343   private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
344     for (String user : superUsers) {
345       boolean hasAccess = false;
346       // TODO: Validate super group members also when ZK supports setting node ACL for groups.
347       if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
348         for (ACL acl : acls) {
349           if (user.equals(acl.getId().getId())) {
350             if (acl.getPerms() == Perms.ALL) {
351               hasAccess = true;
352             } else {
353               if (LOG.isDebugEnabled()) {
354                 LOG.debug(String.format(
355                   "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
356                   acl.getId().getId(), acl.getPerms(), Perms.ALL));
357               }
358             }
359             break;
360           }
361         }
362         if (!hasAccess) {
363           return false;
364         }
365       }
366     }
367     return true;
368   }
369 
370   /*
371    * Validate whether ACL ID is superuser.
372    */
373   public static boolean isSuperUserId(String[] superUsers, Id id) {
374     for (String user : superUsers) {
375       // TODO: Validate super group members also when ZK supports setting node ACL for groups.
376       if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
377         return true;
378       }
379     }
380     return false;
381   }
382 
383   @Override
384   public String toString() {
385     return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
386   }
387 
388   /**
389    * Adds this instance's identifier as a prefix to the passed <code>str</code>
390    * @param str String to amend.
391    * @return A new string with this instance's identifier as prefix: e.g.
392    * if passed 'hello world', the returned string could be
393    */
394   public String prefix(final String str) {
395     return this.toString() + " " + str;
396   }
397 
398   /**
399    * Set the local variable node names using the specified configuration.
400    */
401   private void setNodeNames(Configuration conf) {
402     baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
403         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
404     metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
405            conf.get("zookeeper.znode.metaserver", "meta-region-server")));
406     int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
407             HConstants.DEFAULT_META_REPLICA_NUM);
408     for (int i = 1; i < numMetaReplicas; i++) {
409       String str = ZKUtil.joinZNode(baseZNode,
410         conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
411       metaReplicaZnodes.put(i, str);
412     }
413     rsZNode = ZKUtil.joinZNode(baseZNode,
414         conf.get("zookeeper.znode.rs", "rs"));
415     drainingZNode = ZKUtil.joinZNode(baseZNode,
416         conf.get("zookeeper.znode.draining.rs", "draining"));
417     masterAddressZNode = ZKUtil.joinZNode(baseZNode,
418         conf.get("zookeeper.znode.master", "master"));
419     backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
420         conf.get("zookeeper.znode.backup.masters", "backup-masters"));
421     clusterStateZNode = ZKUtil.joinZNode(baseZNode,
422         conf.get("zookeeper.znode.state", "running"));
423     tableZNode = ZKUtil.joinZNode(baseZNode,
424         conf.get("zookeeper.znode.tableEnableDisable", "table"));
425     clusterIdZNode = ZKUtil.joinZNode(baseZNode,
426         conf.get("zookeeper.znode.clusterId", "hbaseid"));
427     splitLogZNode = ZKUtil.joinZNode(baseZNode,
428         conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
429     balancerZNode = ZKUtil.joinZNode(baseZNode,
430         conf.get("zookeeper.znode.balancer", "balancer"));
431     regionNormalizerZNode = ZKUtil.joinZNode(baseZNode,
432       conf.get("zookeeper.znode.regionNormalizer", "normalizer"));
433     tableLockZNode = ZKUtil.joinZNode(baseZNode,
434         conf.get("zookeeper.znode.tableLock", "table-lock"));
435     recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
436         conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
437     namespaceZNode = ZKUtil.joinZNode(baseZNode,
438         conf.get("zookeeper.znode.namespace", "namespace"));
439   }
440 
441   /**
442    * Is the znode of any meta replica
443    * @param node
444    * @return true or false
445    */
446   public boolean isAnyMetaReplicaZnode(String node) {
447     if (metaReplicaZnodes.values().contains(node)) {
448       return true;
449     }
450     return false;
451   }
452 
453   /**
454    * Is it the default meta replica's znode
455    * @param node
456    * @return true or false
457    */
458   public boolean isDefaultMetaReplicaZnode(String node) {
459     if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
460       return true;
461     }
462     return false;
463   }
464 
465   /**
466    * Get the znodes corresponding to the meta replicas from ZK
467    * @return list of znodes
468    * @throws KeeperException
469    */
470   public List<String> getMetaReplicaNodes() throws KeeperException {
471     List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
472     List<String> metaReplicaNodes = new ArrayList<String>(2);
473     String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
474     for (String child : childrenOfBaseNode) {
475       if (child.startsWith(pattern)) metaReplicaNodes.add(child);
476     }
477     return metaReplicaNodes;
478   }
479 
480   /**
481    * Get the znode string corresponding to a replicaId
482    * @param replicaId
483    * @return znode
484    */
485   public String getZNodeForReplica(int replicaId) {
486     String str = metaReplicaZnodes.get(replicaId);
487     // return a newly created path but don't update the cache of paths
488     // This is mostly needed for tests that attempt to create meta replicas
489     // from outside the master
490     if (str == null) {
491       str = ZKUtil.joinZNode(baseZNode,
492           conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
493     }
494     return str;
495   }
496 
497   /**
498    * Parse the meta replicaId from the passed znode
499    * @param znode
500    * @return replicaId
501    */
502   public int getMetaReplicaIdFromZnode(String znode) {
503     String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
504     if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
505     // the non-default replicas are of the pattern meta-region-server-<replicaId>
506     String nonDefaultPattern = pattern + "-";
507     return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
508   }
509 
510   /**
511    * Register the specified listener to receive ZooKeeper events.
512    * @param listener
513    */
514   public void registerListener(ZooKeeperListener listener) {
515     listeners.add(listener);
516   }
517 
518   /**
519    * Register the specified listener to receive ZooKeeper events and add it as
520    * the first in the list of current listeners.
521    * @param listener
522    */
523   public void registerListenerFirst(ZooKeeperListener listener) {
524     listeners.add(0, listener);
525   }
526 
527   public void unregisterListener(ZooKeeperListener listener) {
528     listeners.remove(listener);
529   }
530 
531   /**
532    * Clean all existing listeners
533    */
534   public void unregisterAllListeners() {
535     listeners.clear();
536   }
537 
538   /**
539    * Get a copy of current registered listeners
540    */
541   public List<ZooKeeperListener> getListeners() {
542     return new ArrayList<ZooKeeperListener>(listeners);
543   }
544 
545   /**
546    * @return The number of currently registered listeners
547    */
548   public int getNumberOfListeners() {
549     return listeners.size();
550   }
551 
552   /**
553    * Get the connection to ZooKeeper.
554    * @return connection reference to zookeeper
555    */
556   public RecoverableZooKeeper getRecoverableZooKeeper() {
557     return recoverableZooKeeper;
558   }
559 
560   public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
561     recoverableZooKeeper.reconnectAfterExpiration();
562   }
563 
564   /**
565    * Get the quorum address of this instance.
566    * @return quorum string of this zookeeper connection instance
567    */
568   public String getQuorum() {
569     return quorum;
570   }
571 
572   /**
573    * @return the base znode of this zookeeper connection instance.
574    */
575   public String getBaseZNode() {
576     return baseZNode;
577   }
578 
579   /**
580    * Method called from ZooKeeper for events and connection status.
581    * <p>
582    * Valid events are passed along to listeners.  Connection status changes
583    * are dealt with locally.
584    */
585   @Override
586   public void process(WatchedEvent event) {
587     LOG.debug(prefix("Received ZooKeeper Event, " +
588         "type=" + event.getType() + ", " +
589         "state=" + event.getState() + ", " +
590         "path=" + event.getPath()));
591 
592     switch(event.getType()) {
593 
594       // If event type is NONE, this is a connection status change
595       case None: {
596         connectionEvent(event);
597         break;
598       }
599 
600       // Otherwise pass along to the listeners
601 
602       case NodeCreated: {
603         for(ZooKeeperListener listener : listeners) {
604           listener.nodeCreated(event.getPath());
605         }
606         break;
607       }
608 
609       case NodeDeleted: {
610         for(ZooKeeperListener listener : listeners) {
611           listener.nodeDeleted(event.getPath());
612         }
613         break;
614       }
615 
616       case NodeDataChanged: {
617         for(ZooKeeperListener listener : listeners) {
618           listener.nodeDataChanged(event.getPath());
619         }
620         break;
621       }
622 
623       case NodeChildrenChanged: {
624         for(ZooKeeperListener listener : listeners) {
625           listener.nodeChildrenChanged(event.getPath());
626         }
627         break;
628       }
629     }
630   }
631 
632   // Connection management
633 
634   /**
635    * Called when there is a connection-related event via the Watcher callback.
636    * <p>
637    * If Disconnected or Expired, this should shutdown the cluster. But, since
638    * we send a KeeperException.SessionExpiredException along with the abort
639    * call, it's possible for the Abortable to catch it and try to create a new
640    * session with ZooKeeper. This is what the client does in HCM.
641    * <p>
642    * @param event
643    */
644   private void connectionEvent(WatchedEvent event) {
645     switch(event.getState()) {
646       case SyncConnected:
647         // Now, this callback can be invoked before the this.zookeeper is set.
648         // Wait a little while.
649         long finished = System.currentTimeMillis() +
650           this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
651         while (System.currentTimeMillis() < finished) {
652           try {
653             Thread.sleep(1);
654           } catch (InterruptedException e) {
655             LOG.warn("Interrupted while sleeping");
656             throw new RuntimeException("Interrupted while waiting for" +
657                 " recoverableZooKeeper is set");
658           }
659           if (this.recoverableZooKeeper != null) break;
660         }
661 
662         if (this.recoverableZooKeeper == null) {
663           LOG.error("ZK is null on connection event -- see stack trace " +
664             "for the stack trace when constructor was called on this zkw",
665             this.constructorCaller);
666           throw new NullPointerException("ZK is null");
667         }
668         this.identifier = this.prefix + "-0x" +
669           Long.toHexString(this.recoverableZooKeeper.getSessionId());
670         // Update our identifier.  Otherwise ignore.
671         LOG.debug(this.identifier + " connected");
672         break;
673 
674       // Abort the server if Disconnected or Expired
675       case Disconnected:
676         LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
677         break;
678 
679       case Expired:
680         String msg = prefix(this.identifier + " received expired from " +
681           "ZooKeeper, aborting");
682         // TODO: One thought is to add call to ZooKeeperListener so say,
683         // ZooKeeperNodeTracker can zero out its data values.
684         if (this.abortable != null) {
685           this.abortable.abort(msg, new KeeperException.SessionExpiredException());
686         }
687         break;
688 
689       case ConnectedReadOnly:
690       case SaslAuthenticated:
691       case AuthFailed:
692         break;
693 
694       default:
695         throw new IllegalStateException("Received event is not valid: " + event.getState());
696     }
697   }
698 
699   /**
700    * Forces a synchronization of this ZooKeeper client connection.
701    * <p>
702    * Executing this method before running other methods will ensure that the
703    * subsequent operations are up-to-date and consistent as of the time that
704    * the sync is complete.
705    * <p>
706    * This is used for compareAndSwap type operations where we need to read the
707    * data of an existing node and delete or transition that node, utilizing the
708    * previously read version and data.  We want to ensure that the version read
709    * is up-to-date from when we begin the operation.
710    */
711   public void sync(String path) throws KeeperException {
712     this.recoverableZooKeeper.sync(path, null, null);
713   }
714 
715   /**
716    * Handles KeeperExceptions in client calls.
717    * <p>
718    * This may be temporary but for now this gives one place to deal with these.
719    * <p>
720    * TODO: Currently this method rethrows the exception to let the caller handle
721    * <p>
722    * @param ke
723    * @throws KeeperException
724    */
725   public void keeperException(KeeperException ke)
726   throws KeeperException {
727     LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
728     throw ke;
729   }
730 
731   /**
732    * Handles InterruptedExceptions in client calls.
733    * <p>
734    * This may be temporary but for now this gives one place to deal with these.
735    * <p>
736    * TODO: Currently, this method does nothing.
737    *       Is this ever expected to happen?  Do we abort or can we let it run?
738    *       Maybe this should be logged as WARN?  It shouldn't happen?
739    * <p>
740    * @param ie
741    */
742   public void interruptedException(InterruptedException ie) {
743     LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
744     // At least preserver interrupt.
745     Thread.currentThread().interrupt();
746     // no-op
747   }
748 
749   /**
750    * Close the connection to ZooKeeper.
751    *
752    */
753   @Override
754   public void close() {
755     try {
756       if (recoverableZooKeeper != null) {
757         recoverableZooKeeper.close();
758       }
759     } catch (InterruptedException e) {
760       Thread.currentThread().interrupt();
761     }
762   }
763 
764   public Configuration getConfiguration() {
765     return conf;
766   }
767 
768   @Override
769   public void abort(String why, Throwable e) {
770     if (this.abortable != null) this.abortable.abort(why, e);
771     else this.aborted = true;
772   }
773 
774   @Override
775   public boolean isAborted() {
776     return this.abortable == null? this.aborted: this.abortable.isAborted();
777   }
778 
779   /**
780    * @return Path to the currently active master.
781    */
782   public String getMasterAddressZNode() {
783     return this.masterAddressZNode;
784   }
785 
786   /**
787    * @return ZooKeeper znode for region normalizer state
788    */
789   public String getRegionNormalizerZNode() {
790     return regionNormalizerZNode;
791   }
792 }