1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.Server;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.hadoop.hbase.ZNodeClearer;
30 import org.apache.hadoop.hbase.exceptions.DeserializationException;
31 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
32 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
33 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
35 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
36 import org.apache.zookeeper.KeeperException;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class ActiveMasterManager extends ZooKeeperListener {
53 private static final Log LOG = LogFactory.getLog(ActiveMasterManager.class);
54
55 final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
56 final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
57
58 private final ServerName sn;
59 private int infoPort;
60 private final Server master;
61
62
63
64
65
66
67 ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master) {
68 super(watcher);
69 watcher.registerListener(this);
70 this.sn = sn;
71 this.master = master;
72 }
73
74
75 public void setInfoPort(int infoPort) {
76 this.infoPort = infoPort;
77 }
78
79 @Override
80 public void nodeCreated(String path) {
81 handle(path);
82 }
83
84 @Override
85 public void nodeDeleted(String path) {
86
87
88
89
90
91
92
93
94 if(path.equals(watcher.clusterStateZNode) && !master.isStopped()) {
95 clusterShutDown.set(true);
96 }
97
98 handle(path);
99 }
100
101 void handle(final String path) {
102 if (path.equals(watcher.getMasterAddressZNode()) && !master.isStopped()) {
103 handleMasterNodeChange();
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 private void handleMasterNodeChange() {
121
122 try {
123 synchronized(clusterHasActiveMaster) {
124 if (ZKUtil.watchAndCheckExists(watcher, watcher.getMasterAddressZNode())) {
125
126 LOG.debug("A master is now available");
127 clusterHasActiveMaster.set(true);
128 } else {
129
130 LOG.debug("No master available. Notifying waiting threads");
131 clusterHasActiveMaster.set(false);
132
133 clusterHasActiveMaster.notifyAll();
134 }
135 }
136 } catch (KeeperException ke) {
137 master.abort("Received an unexpected KeeperException, aborting", ke);
138 }
139 }
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 boolean blockUntilBecomingActiveMaster(
156 int checkInterval, MonitoredTask startupStatus) {
157 String backupZNode = ZKUtil.joinZNode(
158 this.watcher.backupMasterAddressesZNode, this.sn.toString());
159 while (!(master.isAborted() || master.isStopped())) {
160 startupStatus.setStatus("Trying to register in ZK as active master");
161
162
163 try {
164 if (MasterAddressTracker.setMasterAddress(this.watcher,
165 this.watcher.getMasterAddressZNode(), this.sn, infoPort)) {
166
167
168
169 if (ZKUtil.checkExists(this.watcher, backupZNode) != -1) {
170 LOG.info("Deleting ZNode for " + backupZNode + " from backup master directory");
171 ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
172 }
173
174 ZNodeClearer.writeMyEphemeralNodeOnDisk(this.sn.toString());
175
176
177 startupStatus.setStatus("Successfully registered as active master.");
178 this.clusterHasActiveMaster.set(true);
179 LOG.info("Registered Active Master=" + this.sn);
180 return true;
181 }
182
183
184
185 this.clusterHasActiveMaster.set(true);
186
187 String msg;
188 byte[] bytes =
189 ZKUtil.getDataAndWatch(this.watcher, this.watcher.getMasterAddressZNode());
190 if (bytes == null) {
191 msg = ("A master was detected, but went down before its address " +
192 "could be read. Attempting to become the next active master");
193 } else {
194 ServerName currentMaster;
195 try {
196 currentMaster = ServerName.parseFrom(bytes);
197 } catch (DeserializationException e) {
198 LOG.warn("Failed parse", e);
199
200 continue;
201 }
202 if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
203 msg = ("Current master has this master's address, " +
204 currentMaster + "; master was restarted? Deleting node.");
205
206 ZKUtil.deleteNode(this.watcher, this.watcher.getMasterAddressZNode());
207
208
209
210 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
211 } else {
212 msg = "Another master is the active master, " + currentMaster +
213 "; waiting to become the next active master";
214 }
215 }
216 LOG.info(msg);
217 startupStatus.setStatus(msg);
218 } catch (KeeperException ke) {
219 master.abort("Received an unexpected KeeperException, aborting", ke);
220 return false;
221 }
222 synchronized (this.clusterHasActiveMaster) {
223 while (clusterHasActiveMaster.get() && !master.isStopped()) {
224 try {
225 clusterHasActiveMaster.wait(checkInterval);
226 } catch (InterruptedException e) {
227
228
229 LOG.debug("Interrupted waiting for master to die", e);
230 }
231 }
232 if (clusterShutDown.get()) {
233 this.master.stop(
234 "Cluster went down before this master became active");
235 }
236 }
237 }
238 return false;
239 }
240
241
242
243
244 boolean hasActiveMaster() {
245 try {
246 if (ZKUtil.checkExists(watcher, watcher.getMasterAddressZNode()) >= 0) {
247 return true;
248 }
249 }
250 catch (KeeperException ke) {
251 LOG.info("Received an unexpected KeeperException when checking " +
252 "isActiveMaster : "+ ke);
253 }
254 return false;
255 }
256
257 public void stop() {
258 try {
259 synchronized (clusterHasActiveMaster) {
260
261
262 clusterHasActiveMaster.notifyAll();
263 }
264
265 ServerName activeMaster = null;
266 try {
267 activeMaster = MasterAddressTracker.getMasterAddress(this.watcher);
268 } catch (IOException e) {
269 LOG.warn("Failed get of master address: " + e.toString());
270 }
271 if (activeMaster != null && activeMaster.equals(this.sn)) {
272 ZKUtil.deleteNode(watcher, watcher.getMasterAddressZNode());
273
274
275 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
276 }
277 } catch (KeeperException e) {
278 LOG.error(this.watcher.prefix("Error deleting our own master address node"), e);
279 }
280 }
281 }