View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.util.concurrent.atomic.AtomicBoolean;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.Stoppable;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.zookeeper.KeeperException;
28  
29  /**
30   * Handles coordination of a single "leader" instance among many possible
31   * candidates.  The first {@link ZKLeaderManager} to successfully create
32   * the given znode becomes the leader, allowing the instance to continue
33   * with whatever processing must be protected.  Other {@link ZKLeaderManager}
34   * instances will wait to be notified of changes to the leader znode.
35   * If the current master instance fails, the ephemeral leader znode will
36   * be removed, and all waiting instances will be notified, with the race
37   * to claim the leader znode beginning all over again.
38   */
39  public class ZKLeaderManager extends ZooKeeperListener {
40    private static Log LOG = LogFactory.getLog(ZKLeaderManager.class);
41  
42    private final AtomicBoolean leaderExists = new AtomicBoolean();
43    private String leaderZNode;
44    private byte[] nodeId;
45    private Stoppable candidate;
46  
47    public ZKLeaderManager(ZooKeeperWatcher watcher, String leaderZNode,
48        byte[] identifier, Stoppable candidate) {
49      super(watcher);
50      this.leaderZNode = leaderZNode;
51      this.nodeId = identifier;
52      this.candidate = candidate;
53    }
54  
55    public void start() {
56      try {
57        watcher.registerListener(this);
58        String parent = ZKUtil.getParent(leaderZNode);
59        if (ZKUtil.checkExists(watcher, parent) < 0) {
60          ZKUtil.createWithParents(watcher, parent);
61        }
62      } catch (KeeperException ke) {
63        watcher.abort("Unhandled zk exception when starting", ke);
64        candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
65      }
66    }
67  
68    @Override
69    public void nodeCreated(String path) {
70      if (leaderZNode.equals(path) && !candidate.isStopped()) {
71        handleLeaderChange();
72      }
73    }
74  
75    @Override
76    public void nodeDeleted(String path) {
77      if (leaderZNode.equals(path) && !candidate.isStopped()) {
78        handleLeaderChange();
79      }
80    }
81  
82    private void handleLeaderChange() {
83      try {
84        synchronized(leaderExists) {
85          if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
86            LOG.info("Found new leader for znode: "+leaderZNode);
87            leaderExists.set(true);
88          } else {
89            LOG.info("Leader change, but no new leader found");
90            leaderExists.set(false);
91            leaderExists.notifyAll();
92          }
93        }
94      } catch (KeeperException ke) {
95        watcher.abort("ZooKeeper error checking for leader znode", ke);
96        candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
97      }
98    }
99  
100   /**
101    * Blocks until this instance has claimed the leader ZNode in ZooKeeper
102    */
103   public void waitToBecomeLeader() {
104     while (!candidate.isStopped()) {
105       try {
106         if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
107           // claimed the leader znode
108           leaderExists.set(true);
109           if (LOG.isDebugEnabled()) {
110             LOG.debug("Claimed the leader znode as '"+
111                 Bytes.toStringBinary(nodeId)+"'");
112           }
113           return;
114         }
115 
116         // if claiming the node failed, there should be another existing node
117         byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
118         if (currentId != null && Bytes.equals(currentId, nodeId)) {
119           // claimed with our ID, but we didn't grab it, possibly restarted?
120           LOG.info("Found existing leader with our ID ("+
121               Bytes.toStringBinary(nodeId)+"), removing");
122           ZKUtil.deleteNode(watcher, leaderZNode);
123           leaderExists.set(false);
124         } else {
125           LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
126           leaderExists.set(true);
127         }
128       } catch (KeeperException ke) {
129         watcher.abort("Unexpected error from ZK, stopping candidate", ke);
130         candidate.stop("Unexpected error from ZK: "+ke.getMessage());
131         return;
132       }
133 
134       // wait for next chance
135       synchronized(leaderExists) {
136         while (leaderExists.get() && !candidate.isStopped()) {
137           try {
138             leaderExists.wait();
139           } catch (InterruptedException ie) {
140             LOG.debug("Interrupted waiting on leader", ie);
141           }
142         }
143       }
144     }
145   }
146 
147   /**
148    * Removes the leader znode, if it is currently claimed by this instance.
149    */
150   public void stepDownAsLeader() {
151     try {
152       synchronized(leaderExists) {
153         if (!leaderExists.get()) {
154           return;
155         }
156         byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
157         if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
158           LOG.info("Stepping down as leader");
159           ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
160           leaderExists.set(false);
161         } else {
162           LOG.info("Not current leader, no need to step down");
163         }
164       }
165     } catch (KeeperException ke) {
166       watcher.abort("Unhandled zookeeper exception removing leader node", ke);
167       candidate.stop("Unhandled zookeeper exception removing leader node: "
168           + ke.getMessage());
169     }
170   }
171 
172   public boolean hasLeader() {
173     return leaderExists.get();
174   }
175 }