View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.CopyOnWriteArrayList;
28  import java.util.concurrent.CountDownLatch;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Abortable;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
36  import org.apache.hadoop.hbase.util.Threads;
37  import org.apache.zookeeper.KeeperException;
38  import org.apache.zookeeper.WatchedEvent;
39  import org.apache.zookeeper.Watcher;
40  import org.apache.zookeeper.ZooDefs;
41  import org.apache.zookeeper.data.ACL;
42  
43  /**
44   * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
45   * for each Master, RegionServer, and client process.
46   *
47   * <p>This is the only class that implements {@link Watcher}.  Other internal
48   * classes which need to be notified of ZooKeeper events must register with
49   * the local instance of this watcher via {@link #registerListener}.
50   *
51   * <p>This class also holds and manages the connection to ZooKeeper.  Code to
52   * deal with connection related events and exceptions are handled here.
53   */
54  public class ZooKeeperWatcher implements Watcher, Abortable {
55    private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
56  
57    // Identifier for this watcher (for logging only).  It is made of the prefix
58    // passed on construction and the zookeeper sessionid.
59    private String prefix;
60    private String identifier;
61  
62    // zookeeper quorum
63    private String quorum;
64  
65    // zookeeper connection
66    private RecoverableZooKeeper recoverableZooKeeper;
67  
68    // abortable in case of zk failure
69    private Abortable abortable;
70  
71    // listeners to be notified
72    private final List<ZooKeeperListener> listeners =
73      new CopyOnWriteArrayList<ZooKeeperListener>();
74  
75    // set of unassigned nodes watched
76    private Set<String> unassignedNodes = new HashSet<String>();
77  
78    // node names
79  
80    // base znode for this cluster
81    public String baseZNode;
82    // znode containing location of server hosting root region
83    public String rootServerZNode;
84    // znode containing ephemeral nodes of the regionservers
85    public String rsZNode;
86    // znode containing ephemeral nodes of the draining regionservers
87    public String drainingZNode;
88    // znode of currently active master
89    public String masterAddressZNode;
90    // znode of this master in backup master directory, if not the active master
91    public String backupMasterAddressesZNode;
92    // znode containing the current cluster state
93    public String clusterStateZNode;
94    // znode used for region transitioning and assignment
95    public String assignmentZNode;
96    // znode that the master uses for reading/writing the table disabling/enabling states
97    public String masterTableZNode;
98    // znode where the client reads table enabling/disabling states.
99    public String clientTableZNode;
100   // znode where the master writes table disabling/enabling states in the format expected
101   // by 0.92.0/0.92.1 clients for backwards compatibility.  See HBASE-6710 for details.
102   public String masterTableZNode92;
103   // znode containing the unique cluster ID
104   public String clusterIdZNode;
105   // znode used for log splitting work assignment
106   public String splitLogZNode;
107 
108   // Certain ZooKeeper nodes need to be world-readable
109   public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
110     new ArrayList<ACL>() { {
111       add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
112       add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
113     }};
114 
115   private final Configuration conf;
116 
117   private final Exception constructorCaller;
118 
119   /**
120    * Instantiate a ZooKeeper connection and watcher.
121    * @param descriptor Descriptive string that is added to zookeeper sessionid
122    * and used as identifier for this instance.
123    * @throws IOException
124    * @throws ZooKeeperConnectionException
125    */
126   public ZooKeeperWatcher(Configuration conf, String descriptor,
127       Abortable abortable) throws ZooKeeperConnectionException, IOException {
128     this(conf, descriptor, abortable, false);
129   }
130   /**
131    * Instantiate a ZooKeeper connection and watcher.
132    * @param descriptor Descriptive string that is added to zookeeper sessionid
133    * and used as identifier for this instance.
134    * @throws IOException
135    * @throws ZooKeeperConnectionException
136    */
137   public ZooKeeperWatcher(Configuration conf, String descriptor,
138       Abortable abortable, boolean canCreateBaseZNode)
139   throws IOException, ZooKeeperConnectionException {
140     this.conf = conf;
141     // Capture a stack trace now.  Will print it out later if problem so we can
142     // distingush amongst the myriad ZKWs.
143     try {
144       throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
145     } catch (Exception e) {
146       this.constructorCaller = e;
147     }
148     this.quorum = ZKConfig.getZKQuorumServersString(conf);
149     // Identifier will get the sessionid appended later below down when we
150     // handle the syncconnect event.
151     this.prefix = descriptor;
152     this.identifier = descriptor + "0x0";
153     this.abortable = abortable;
154     setNodeNames(conf);
155     this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
156     if (canCreateBaseZNode) {
157       createBaseZNodes();
158     }
159   }
160 
161   private void createBaseZNodes() throws ZooKeeperConnectionException {
162     try {
163       // Create all the necessary "directories" of znodes
164       ZKUtil.createAndFailSilent(this, baseZNode);
165       ZKUtil.createAndFailSilent(this, assignmentZNode);
166       ZKUtil.createAndFailSilent(this, rsZNode);
167       ZKUtil.createAndFailSilent(this, drainingZNode);
168       ZKUtil.createAndFailSilent(this, masterTableZNode);
169       ZKUtil.createAndFailSilent(this, masterTableZNode92);
170       ZKUtil.createAndFailSilent(this, splitLogZNode);
171       ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
172     } catch (KeeperException e) {
173       throw new ZooKeeperConnectionException(
174           prefix("Unexpected KeeperException creating base node"), e);
175     }
176   }
177 
178   private boolean isFinishedRetryingRecoverable(final long finished) {
179     return System.currentTimeMillis() < finished;
180   }
181 
182   @Override
183   public String toString() {
184     return this.identifier;
185   }
186 
187   /**
188    * Adds this instance's identifier as a prefix to the passed <code>str</code>
189    * @param str String to amend.
190    * @return A new string with this instance's identifier as prefix: e.g.
191    * if passed 'hello world', the returned string could be
192    */
193   public String prefix(final String str) {
194     return this.toString() + " " + str;
195   }
196 
197   /**
198    * Set the local variable node names using the specified configuration.
199    */
200   private void setNodeNames(Configuration conf) {
201     baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
202         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
203     rootServerZNode = ZKUtil.joinZNode(baseZNode,
204         conf.get("zookeeper.znode.rootserver", "root-region-server"));
205     rsZNode = ZKUtil.joinZNode(baseZNode,
206         conf.get("zookeeper.znode.rs", "rs"));
207     drainingZNode = ZKUtil.joinZNode(baseZNode,
208         conf.get("zookeeper.znode.draining.rs", "draining"));
209     masterAddressZNode = ZKUtil.joinZNode(baseZNode,
210         conf.get("zookeeper.znode.master", "master"));
211     backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
212         conf.get("zookeeper.znode.backup.masters", "backup-masters"));
213     clusterStateZNode = ZKUtil.joinZNode(baseZNode,
214         conf.get("zookeeper.znode.state", "shutdown"));
215     assignmentZNode = ZKUtil.joinZNode(baseZNode,
216         conf.get("zookeeper.znode.unassigned", "unassigned"));
217     String tableZNodeDefault = "table";
218     masterTableZNode = ZKUtil.joinZNode(baseZNode,
219         conf.get("zookeeper.znode.masterTableEnableDisable", tableZNodeDefault));
220     clientTableZNode = ZKUtil.joinZNode(baseZNode,
221             conf.get("zookeeper.znode.clientTableEnableDisable", tableZNodeDefault));
222     masterTableZNode92 = ZKUtil.joinZNode(baseZNode,
223         conf.get("zookeeper.znode.masterTableEnableDisable92", "table92"));
224     clusterIdZNode = ZKUtil.joinZNode(baseZNode,
225         conf.get("zookeeper.znode.clusterId", "hbaseid"));
226     splitLogZNode = ZKUtil.joinZNode(baseZNode,
227         conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
228   }
229 
230   /**
231    * Register the specified listener to receive ZooKeeper events.
232    * @param listener
233    */
234   public void registerListener(ZooKeeperListener listener) {
235     listeners.add(listener);
236   }
237 
238   /**
239    * Register the specified listener to receive ZooKeeper events and add it as
240    * the first in the list of current listeners.
241    * @param listener
242    */
243   public void registerListenerFirst(ZooKeeperListener listener) {
244     listeners.add(0, listener);
245   }
246 
247   /**
248    * Clean all existing listeners
249    */
250   public void unregisterAllListeners() {
251     listeners.clear();
252   }
253 
254   /**
255    * Get a copy of current registered listeners
256    */
257   public List<ZooKeeperListener> getListeners() {
258     return new ArrayList<ZooKeeperListener>(listeners);
259   }
260 
261   /**
262    * @return The number of currently registered listeners
263    */
264   public int getNumberOfListeners() {
265     return listeners.size();
266   }
267 
268   /**
269    * Get the connection to ZooKeeper.
270    * @return connection reference to zookeeper
271    */
272   public RecoverableZooKeeper getRecoverableZooKeeper() {
273     return recoverableZooKeeper;
274   }
275 
276   public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
277     recoverableZooKeeper.reconnectAfterExpiration();
278   }
279 
280   /**
281    * Get the quorum address of this instance.
282    * @return quorum string of this zookeeper connection instance
283    */
284   public String getQuorum() {
285     return quorum;
286   }
287 
288   /**
289    * Method called from ZooKeeper for events and connection status.
290    * <p>
291    * Valid events are passed along to listeners.  Connection status changes
292    * are dealt with locally.
293    */
294   @Override
295   public void process(WatchedEvent event) {
296     LOG.debug(prefix("Received ZooKeeper Event, " +
297         "type=" + event.getType() + ", " +
298         "state=" + event.getState() + ", " +
299         "path=" + event.getPath()));
300 
301     switch(event.getType()) {
302 
303       // If event type is NONE, this is a connection status change
304       case None: {
305         connectionEvent(event);
306         break;
307       }
308 
309       // Otherwise pass along to the listeners
310 
311       case NodeCreated: {
312         for(ZooKeeperListener listener : listeners) {
313           listener.nodeCreated(event.getPath());
314         }
315         break;
316       }
317 
318       case NodeDeleted: {
319         for(ZooKeeperListener listener : listeners) {
320           listener.nodeDeleted(event.getPath());
321         }
322         break;
323       }
324 
325       case NodeDataChanged: {
326         for(ZooKeeperListener listener : listeners) {
327           listener.nodeDataChanged(event.getPath());
328         }
329         break;
330       }
331 
332       case NodeChildrenChanged: {
333         for(ZooKeeperListener listener : listeners) {
334           listener.nodeChildrenChanged(event.getPath());
335         }
336         break;
337       }
338     }
339   }
340 
341   // Connection management
342 
343   /**
344    * Called when there is a connection-related event via the Watcher callback.
345    * <p>
346    * If Disconnected or Expired, this should shutdown the cluster. But, since
347    * we send a KeeperException.SessionExpiredException along with the abort
348    * call, it's possible for the Abortable to catch it and try to create a new
349    * session with ZooKeeper. This is what the client does in HCM.
350    * <p>
351    * @param event
352    */
353   private void connectionEvent(WatchedEvent event) {
354     switch(event.getState()) {
355       case SyncConnected:
356         // Now, this callback can be invoked before the this.zookeeper is set.
357         // Wait a little while.
358         long finished = System.currentTimeMillis() +
359           this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
360         while (System.currentTimeMillis() < finished) {
361           Threads.sleep(1);
362           if (this.recoverableZooKeeper != null) break;
363         }
364         if (this.recoverableZooKeeper == null) {
365           LOG.error("ZK is null on connection event -- see stack trace " +
366             "for the stack trace when constructor was called on this zkw",
367             this.constructorCaller);
368           throw new NullPointerException("ZK is null");
369         }
370         this.identifier = this.prefix + "-0x" +
371           Long.toHexString(this.recoverableZooKeeper.getSessionId());
372         // Update our identifier.  Otherwise ignore.
373         LOG.debug(this.identifier + " connected");
374         break;
375 
376       // Abort the server if Disconnected or Expired
377       case Disconnected:
378         LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
379         break;
380 
381       case Expired:
382         String msg = prefix(this.identifier + " received expired from " +
383           "ZooKeeper, aborting");
384         // TODO: One thought is to add call to ZooKeeperListener so say,
385         // ZooKeeperNodeTracker can zero out its data values.
386         if (this.abortable != null) this.abortable.abort(msg,
387             new KeeperException.SessionExpiredException());
388         break;
389     }
390   }
391 
392   /**
393    * Forces a synchronization of this ZooKeeper client connection.
394    * <p>
395    * Executing this method before running other methods will ensure that the
396    * subsequent operations are up-to-date and consistent as of the time that
397    * the sync is complete.
398    * <p>
399    * This is used for compareAndSwap type operations where we need to read the
400    * data of an existing node and delete or transition that node, utilizing the
401    * previously read version and data.  We want to ensure that the version read
402    * is up-to-date from when we begin the operation.
403    */
404   public void sync(String path) throws KeeperException {
405     this.recoverableZooKeeper.sync(path, null, null);
406   }
407 
408   /**
409    * Handles KeeperExceptions in client calls.
410    * <p>
411    * This may be temporary but for now this gives one place to deal with these.
412    * <p>
413    * TODO: Currently this method rethrows the exception to let the caller handle
414    * <p>
415    * @param ke
416    * @throws KeeperException
417    */
418   public void keeperException(KeeperException ke)
419   throws KeeperException {
420     LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
421     throw ke;
422   }
423 
424   /**
425    * Handles InterruptedExceptions in client calls.
426    * <p>
427    * This may be temporary but for now this gives one place to deal with these.
428    * <p>
429    * TODO: Currently, this method does nothing.
430    *       Is this ever expected to happen?  Do we abort or can we let it run?
431    *       Maybe this should be logged as WARN?  It shouldn't happen?
432    * <p>
433    * @param ie
434    */
435   public void interruptedException(InterruptedException ie) {
436     LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
437     // At least preserver interrupt.
438     Thread.currentThread().interrupt();
439     // no-op
440   }
441 
442   /**
443    * Close the connection to ZooKeeper.
444    * @throws InterruptedException
445    */
446   public void close() {
447     try {
448       if (recoverableZooKeeper != null) {
449         recoverableZooKeeper.close();
450 //        super.close();
451       }
452     } catch (InterruptedException e) {
453     }
454   }
455 
456   public Configuration getConfiguration() {
457     return conf;
458   }
459 
460   @Override
461   public void abort(String why, Throwable e) {
462     this.abortable.abort(why, e);
463   }
464 
465   @Override
466   public boolean isAborted() {
467     return this.abortable.isAborted();
468   }
469 }