1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import io.netty.bootstrap.Bootstrap;
24 import io.netty.buffer.ByteBufInputStream;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.EventLoopGroup;
28 import io.netty.channel.SimpleChannelInboundHandler;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import io.netty.channel.socket.DatagramChannel;
31 import io.netty.channel.socket.DatagramPacket;
32 import io.netty.channel.socket.nio.NioDatagramChannel;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
38 import org.apache.hadoop.hbase.ClusterStatus;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
42 import org.apache.hadoop.hbase.util.Addressing;
43 import org.apache.hadoop.hbase.util.ExceptionUtil;
44 import org.apache.hadoop.hbase.util.Threads;
45
46 import java.io.Closeable;
47 import java.io.IOException;
48 import java.lang.reflect.Constructor;
49 import java.lang.reflect.InvocationTargetException;
50 import java.net.InetAddress;
51 import java.net.NetworkInterface;
52 import java.net.UnknownHostException;
53 import java.util.ArrayList;
54 import java.util.List;
55
56
57
58
59
60
61
62 @InterfaceAudience.Private
63 class ClusterStatusListener implements Closeable {
64 private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
65 private final List<ServerName> deadServers = new ArrayList<ServerName>();
66 protected final DeadServerHandler deadServerHandler;
67 private final Listener listener;
68
69
70
71
72 public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
73 public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
74 MulticastListener.class;
75
76
77
78
79 public interface DeadServerHandler {
80
81
82
83
84
85
86
87 void newDead(ServerName sn);
88 }
89
90
91
92
93
94 interface Listener extends Closeable {
95
96
97
98 @Override
99 void close();
100
101
102
103
104
105
106
107 void connect(Configuration conf) throws IOException;
108 }
109
110 public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
111 Class<? extends Listener> listenerClass) throws IOException {
112 this.deadServerHandler = dsh;
113 try {
114 Constructor<? extends Listener> ctor =
115 listenerClass.getConstructor(ClusterStatusListener.class);
116 this.listener = ctor.newInstance(this);
117 } catch (InstantiationException e) {
118 throw new IOException("Can't create listener " + listenerClass.getName(), e);
119 } catch (IllegalAccessException e) {
120 throw new IOException("Can't create listener " + listenerClass.getName(), e);
121 } catch (NoSuchMethodException e) {
122 throw new IllegalStateException();
123 } catch (InvocationTargetException e) {
124 throw new IllegalStateException();
125 }
126
127 this.listener.connect(conf);
128 }
129
130
131
132
133
134
135 public void receive(ClusterStatus ncs) {
136 if (ncs.getDeadServerNames() != null) {
137 for (ServerName sn : ncs.getDeadServerNames()) {
138 if (!isDeadServer(sn)) {
139 LOG.info("There is a new dead server: " + sn);
140 deadServers.add(sn);
141 if (deadServerHandler != null) {
142 deadServerHandler.newDead(sn);
143 }
144 }
145 }
146 }
147 }
148
149 @Override
150 public void close() {
151 listener.close();
152 }
153
154
155
156
157
158
159
160 public boolean isDeadServer(ServerName sn) {
161 if (sn.getStartcode() <= 0) {
162 return false;
163 }
164
165 for (ServerName dead : deadServers) {
166 if (dead.getStartcode() >= sn.getStartcode() &&
167 dead.getPort() == sn.getPort() &&
168 dead.getHostname().equals(sn.getHostname())) {
169 return true;
170 }
171 }
172
173 return false;
174 }
175
176
177
178
179
180 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
181 class MulticastListener implements Listener {
182 private DatagramChannel channel;
183 private final EventLoopGroup group = new NioEventLoopGroup(
184 1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
185
186 public MulticastListener() {
187 }
188
189 @Override
190 public void connect(Configuration conf) throws IOException {
191
192 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
193 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
194 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
195 HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
196 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
197 HConstants.DEFAULT_STATUS_MULTICAST_PORT);
198
199 InetAddress ina;
200 try {
201 ina = InetAddress.getByName(mcAddress);
202 } catch (UnknownHostException e) {
203 close();
204 throw new IOException("Can't connect to " + mcAddress, e);
205 }
206
207 try {
208 Bootstrap b = new Bootstrap();
209 b.group(group)
210 .channel(NioDatagramChannel.class)
211 .option(ChannelOption.SO_REUSEADDR, true)
212 .handler(new ClusterStatusHandler());
213
214 channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
215 } catch (InterruptedException e) {
216 close();
217 throw ExceptionUtil.asInterrupt(e);
218 }
219
220 NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
221 channel.joinGroup(ina, ni, null, channel.newPromise());
222 }
223
224 @Override
225 public void close() {
226 if (channel != null) {
227 channel.close();
228 channel = null;
229 }
230 group.shutdownGracefully();
231 }
232
233
234
235
236
237
238 private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
239
240 @Override
241 public void exceptionCaught(
242 ChannelHandlerContext ctx, Throwable cause)
243 throws Exception {
244 LOG.error("Unexpected exception, continuing.", cause);
245 }
246
247 @Override
248 public boolean acceptInboundMessage(Object msg)
249 throws Exception {
250 return super.acceptInboundMessage(msg);
251 }
252
253
254 @Override
255 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
256 ByteBufInputStream bis = new ByteBufInputStream(dp.content());
257 try {
258 ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
259 ClusterStatus ncs = ClusterStatus.convert(csp);
260 receive(ncs);
261 } finally {
262 bis.close();
263 }
264 }
265 }
266 }
267 }