View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.util.concurrent.atomic.AtomicBoolean;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.Stoppable;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.zookeeper.KeeperException;
29  
30  /**
31   * Handles coordination of a single "leader" instance among many possible
32   * candidates.  The first {@link ZKLeaderManager} to successfully create
33   * the given znode becomes the leader, allowing the instance to continue
34   * with whatever processing must be protected.  Other {@link ZKLeaderManager}
35   * instances will wait to be notified of changes to the leader znode.
36   * If the current master instance fails, the ephemeral leader znode will
37   * be removed, and all waiting instances will be notified, with the race
38   * to claim the leader znode beginning all over again.
39   * @deprecated Not used
40   */
41  @Deprecated
42  @InterfaceAudience.Private
43  public class ZKLeaderManager extends ZooKeeperListener {
44    private static Log LOG = LogFactory.getLog(ZKLeaderManager.class);
45  
46    private final AtomicBoolean leaderExists = new AtomicBoolean();
47    private String leaderZNode;
48    private byte[] nodeId;
49    private Stoppable candidate;
50  
51    public ZKLeaderManager(ZooKeeperWatcher watcher, String leaderZNode,
52        byte[] identifier, Stoppable candidate) {
53      super(watcher);
54      this.leaderZNode = leaderZNode;
55      this.nodeId = identifier;
56      this.candidate = candidate;
57    }
58  
59    public void start() {
60      try {
61        watcher.registerListener(this);
62        String parent = ZKUtil.getParent(leaderZNode);
63        if (ZKUtil.checkExists(watcher, parent) < 0) {
64          ZKUtil.createWithParents(watcher, parent);
65        }
66      } catch (KeeperException ke) {
67        watcher.abort("Unhandled zk exception when starting", ke);
68        candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
69      }
70    }
71  
72    @Override
73    public void nodeCreated(String path) {
74      if (leaderZNode.equals(path) && !candidate.isStopped()) {
75        handleLeaderChange();
76      }
77    }
78  
79    @Override
80    public void nodeDeleted(String path) {
81      if (leaderZNode.equals(path) && !candidate.isStopped()) {
82        handleLeaderChange();
83      }
84    }
85  
86    private void handleLeaderChange() {
87      try {
88        synchronized(leaderExists) {
89          if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
90            LOG.info("Found new leader for znode: "+leaderZNode);
91            leaderExists.set(true);
92          } else {
93            LOG.info("Leader change, but no new leader found");
94            leaderExists.set(false);
95            leaderExists.notifyAll();
96          }
97        }
98      } catch (KeeperException ke) {
99        watcher.abort("ZooKeeper error checking for leader znode", ke);
100       candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
101     }
102   }
103 
104   /**
105    * Blocks until this instance has claimed the leader ZNode in ZooKeeper
106    */
107   public void waitToBecomeLeader() {
108     while (!candidate.isStopped()) {
109       try {
110         if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
111           // claimed the leader znode
112           leaderExists.set(true);
113           if (LOG.isDebugEnabled()) {
114             LOG.debug("Claimed the leader znode as '"+
115                 Bytes.toStringBinary(nodeId)+"'");
116           }
117           return;
118         }
119 
120         // if claiming the node failed, there should be another existing node
121         byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
122         if (currentId != null && Bytes.equals(currentId, nodeId)) {
123           // claimed with our ID, but we didn't grab it, possibly restarted?
124           LOG.info("Found existing leader with our ID ("+
125               Bytes.toStringBinary(nodeId)+"), removing");
126           ZKUtil.deleteNode(watcher, leaderZNode);
127           leaderExists.set(false);
128         } else {
129           LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
130           leaderExists.set(true);
131         }
132       } catch (KeeperException ke) {
133         watcher.abort("Unexpected error from ZK, stopping candidate", ke);
134         candidate.stop("Unexpected error from ZK: "+ke.getMessage());
135         return;
136       }
137 
138       // wait for next chance
139       synchronized(leaderExists) {
140         while (leaderExists.get() && !candidate.isStopped()) {
141           try {
142             leaderExists.wait();
143           } catch (InterruptedException ie) {
144             LOG.debug("Interrupted waiting on leader", ie);
145           }
146         }
147       }
148     }
149   }
150 
151   /**
152    * Removes the leader znode, if it is currently claimed by this instance.
153    */
154   public void stepDownAsLeader() {
155     try {
156       synchronized(leaderExists) {
157         if (!leaderExists.get()) {
158           return;
159         }
160         byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
161         if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
162           LOG.info("Stepping down as leader");
163           ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
164           leaderExists.set(false);
165         } else {
166           LOG.info("Not current leader, no need to step down");
167         }
168       }
169     } catch (KeeperException ke) {
170       watcher.abort("Unhandled zookeeper exception removing leader node", ke);
171       candidate.stop("Unhandled zookeeper exception removing leader node: "
172           + ke.getMessage());
173     } catch (InterruptedException e) {
174       watcher.abort("Unhandled zookeeper exception removing leader node", e);
175       candidate.stop("Unhandled zookeeper exception removing leader node: "
176           + e.getMessage());
177     }
178   }
179 
180   public boolean hasLeader() {
181     return leaderExists.get();
182   }
183 }