1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.Stoppable;
27 import org.apache.hadoop.hbase.util.Bytes;
28 import org.apache.zookeeper.KeeperException;
29
30
31
32
33
34
35
36
37
38
39
40
41 @Deprecated
42 @InterfaceAudience.Private
43 public class ZKLeaderManager extends ZooKeeperListener {
44 private static final Log LOG = LogFactory.getLog(ZKLeaderManager.class);
45
46 private final AtomicBoolean leaderExists = new AtomicBoolean();
47 private String leaderZNode;
48 private byte[] nodeId;
49 private Stoppable candidate;
50
51 public ZKLeaderManager(ZooKeeperWatcher watcher, String leaderZNode,
52 byte[] identifier, Stoppable candidate) {
53 super(watcher);
54 this.leaderZNode = leaderZNode;
55 this.nodeId = identifier;
56 this.candidate = candidate;
57 }
58
59 public void start() {
60 try {
61 watcher.registerListener(this);
62 String parent = ZKUtil.getParent(leaderZNode);
63 if (ZKUtil.checkExists(watcher, parent) < 0) {
64 ZKUtil.createWithParents(watcher, parent);
65 }
66 } catch (KeeperException ke) {
67 watcher.abort("Unhandled zk exception when starting", ke);
68 candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
69 }
70 }
71
72 @Override
73 public void nodeCreated(String path) {
74 if (leaderZNode.equals(path) && !candidate.isStopped()) {
75 handleLeaderChange();
76 }
77 }
78
79 @Override
80 public void nodeDeleted(String path) {
81 if (leaderZNode.equals(path) && !candidate.isStopped()) {
82 handleLeaderChange();
83 }
84 }
85
86 private void handleLeaderChange() {
87 try {
88 synchronized(leaderExists) {
89 if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
90 LOG.info("Found new leader for znode: "+leaderZNode);
91 leaderExists.set(true);
92 } else {
93 LOG.info("Leader change, but no new leader found");
94 leaderExists.set(false);
95 leaderExists.notifyAll();
96 }
97 }
98 } catch (KeeperException ke) {
99 watcher.abort("ZooKeeper error checking for leader znode", ke);
100 candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
101 }
102 }
103
104
105
106
107 public void waitToBecomeLeader() {
108 while (!candidate.isStopped()) {
109 try {
110 if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
111
112 leaderExists.set(true);
113 if (LOG.isDebugEnabled()) {
114 LOG.debug("Claimed the leader znode as '"+
115 Bytes.toStringBinary(nodeId)+"'");
116 }
117 return;
118 }
119
120
121 byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
122 if (currentId != null && Bytes.equals(currentId, nodeId)) {
123
124 LOG.info("Found existing leader with our ID ("+
125 Bytes.toStringBinary(nodeId)+"), removing");
126 ZKUtil.deleteNode(watcher, leaderZNode);
127 leaderExists.set(false);
128 } else {
129 LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
130 leaderExists.set(true);
131 }
132 } catch (KeeperException ke) {
133 watcher.abort("Unexpected error from ZK, stopping candidate", ke);
134 candidate.stop("Unexpected error from ZK: "+ke.getMessage());
135 return;
136 }
137
138
139 synchronized(leaderExists) {
140 while (leaderExists.get() && !candidate.isStopped()) {
141 try {
142 leaderExists.wait();
143 } catch (InterruptedException ie) {
144 LOG.debug("Interrupted waiting on leader", ie);
145 }
146 }
147 }
148 }
149 }
150
151
152
153
154 public void stepDownAsLeader() {
155 try {
156 synchronized(leaderExists) {
157 if (!leaderExists.get()) {
158 return;
159 }
160 byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
161 if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
162 LOG.info("Stepping down as leader");
163 ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
164 leaderExists.set(false);
165 } else {
166 LOG.info("Not current leader, no need to step down");
167 }
168 }
169 } catch (KeeperException ke) {
170 watcher.abort("Unhandled zookeeper exception removing leader node", ke);
171 candidate.stop("Unhandled zookeeper exception removing leader node: "
172 + ke.getMessage());
173 } catch (InterruptedException e) {
174 watcher.abort("Unhandled zookeeper exception removing leader node", e);
175 candidate.stop("Unhandled zookeeper exception removing leader node: "
176 + e.getMessage());
177 }
178 }
179
180 public boolean hasLeader() {
181 return leaderExists.get();
182 }
183 }