1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.master;
22
23
24 import io.netty.bootstrap.Bootstrap;
25 import io.netty.bootstrap.ChannelFactory;
26 import io.netty.buffer.Unpooled;
27 import io.netty.channel.Channel;
28 import io.netty.channel.ChannelException;
29 import io.netty.channel.ChannelHandlerContext;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.EventLoopGroup;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.DatagramChannel;
34 import io.netty.channel.socket.DatagramPacket;
35 import io.netty.channel.socket.InternetProtocolFamily;
36 import io.netty.channel.socket.nio.NioDatagramChannel;
37 import io.netty.handler.codec.MessageToMessageEncoder;
38 import io.netty.util.internal.StringUtil;
39
40 import java.io.Closeable;
41 import java.io.IOException;
42 import java.net.Inet6Address;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.NetworkInterface;
46 import java.net.UnknownHostException;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Comparator;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.concurrent.ConcurrentHashMap;
53 import java.util.concurrent.ConcurrentMap;
54
55 import org.apache.hadoop.conf.Configuration;
56 import org.apache.hadoop.hbase.ClusterStatus;
57 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
58 import org.apache.hadoop.hbase.HConstants;
59 import org.apache.hadoop.hbase.ScheduledChore;
60 import org.apache.hadoop.hbase.ServerName;
61 import org.apache.hadoop.hbase.classification.InterfaceAudience;
62 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
63 import org.apache.hadoop.hbase.util.Addressing;
64 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65 import org.apache.hadoop.hbase.util.ExceptionUtil;
66 import org.apache.hadoop.hbase.util.Pair;
67 import org.apache.hadoop.hbase.util.ReflectionUtils;
68 import org.apache.hadoop.hbase.util.Threads;
69 import org.apache.hadoop.hbase.util.VersionInfo;
70
71
72
73
74
75
76
77
78 @InterfaceAudience.Private
79 public class ClusterStatusPublisher extends ScheduledChore {
80
81
82
83
84
85 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
86 public static final Class<? extends ClusterStatusPublisher.Publisher>
87 DEFAULT_STATUS_PUBLISHER_CLASS =
88 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
89
90
91
92
93 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
94 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
95
96 private long lastMessageTime = 0;
97 private final HMaster master;
98 private final int messagePeriod;
99 private final ConcurrentMap<ServerName, Integer> lastSent =
100 new ConcurrentHashMap<ServerName, Integer>();
101 private Publisher publisher;
102 private boolean connected = false;
103
104
105
106
107
108 public final static int MAX_SERVER_PER_MESSAGE = 10;
109
110
111
112
113
114 public final static int NB_SEND = 5;
115
116 public ClusterStatusPublisher(HMaster master, Configuration conf,
117 Class<? extends Publisher> publisherClass)
118 throws IOException {
119 super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
120 STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
121 this.master = master;
122 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
123 try {
124 this.publisher = publisherClass.newInstance();
125 } catch (InstantiationException e) {
126 throw new IOException("Can't create publisher " + publisherClass.getName(), e);
127 } catch (IllegalAccessException e) {
128 throw new IOException("Can't create publisher " + publisherClass.getName(), e);
129 }
130 this.publisher.connect(conf);
131 connected = true;
132 }
133
134
135 protected ClusterStatusPublisher() {
136 master = null;
137 messagePeriod = 0;
138 }
139
140 @Override
141 protected void chore() {
142 if (!connected) {
143 return;
144 }
145
146 List<ServerName> sns = generateDeadServersListToSend();
147 if (sns.isEmpty()) {
148
149 return;
150 }
151
152 final long curTime = EnvironmentEdgeManager.currentTime();
153 if (lastMessageTime > curTime - messagePeriod) {
154
155 return;
156 }
157
158
159 lastMessageTime = curTime;
160
161
162
163
164 ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
165 master.getMasterFileSystem().getClusterId().toString(),
166 null,
167 sns,
168 master.getServerName(),
169 null,
170 null,
171 null,
172 null);
173
174
175 publisher.publish(cs);
176 }
177
178 protected void cleanup() {
179 connected = false;
180 publisher.close();
181 }
182
183
184
185
186
187
188 protected List<ServerName> generateDeadServersListToSend() {
189
190 long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
191 for (Pair<ServerName, Long> dead : getDeadServers(since)) {
192 lastSent.putIfAbsent(dead.getFirst(), 0);
193 }
194
195
196 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
197 entries.addAll(lastSent.entrySet());
198 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
199 @Override
200 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
201 return o1.getValue().compareTo(o2.getValue());
202 }
203 });
204
205
206 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
207 List<ServerName> res = new ArrayList<ServerName>(max);
208
209 for (int i = 0; i < max; i++) {
210 Map.Entry<ServerName, Integer> toSend = entries.get(i);
211 if (toSend.getValue() >= (NB_SEND - 1)) {
212 lastSent.remove(toSend.getKey());
213 } else {
214 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
215 }
216
217 res.add(toSend.getKey());
218 }
219
220 return res;
221 }
222
223
224
225
226
227 protected List<Pair<ServerName, Long>> getDeadServers(long since) {
228 if (master.getServerManager() == null) {
229 return Collections.emptyList();
230 }
231
232 return master.getServerManager().getDeadServers().copyDeadServersSince(since);
233 }
234
235
236 public interface Publisher extends Closeable {
237
238 void connect(Configuration conf) throws IOException;
239
240 void publish(ClusterStatus cs);
241
242 @Override
243 void close();
244 }
245
246 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
247 public static class MulticastPublisher implements Publisher {
248 private DatagramChannel channel;
249 private final EventLoopGroup group = new NioEventLoopGroup(
250 1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
251
252 public MulticastPublisher() {
253 }
254
255 @Override
256 public void connect(Configuration conf) throws IOException {
257 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
258 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
259 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
260 HConstants.DEFAULT_STATUS_MULTICAST_PORT);
261
262 final InetAddress ina;
263 try {
264 ina = InetAddress.getByName(mcAddress);
265 } catch (UnknownHostException e) {
266 close();
267 throw new IOException("Can't connect to " + mcAddress, e);
268 }
269
270 final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
271
272 InternetProtocolFamily family;
273 InetAddress localAddress;
274 if (ina instanceof Inet6Address) {
275 localAddress = Addressing.getIp6Address();
276 family = InternetProtocolFamily.IPv6;
277 }else{
278 localAddress = Addressing.getIp4Address();
279 family = InternetProtocolFamily.IPv4;
280 }
281 NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
282
283 Bootstrap b = new Bootstrap();
284 b.group(group)
285 .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
286 .option(ChannelOption.SO_REUSEADDR, true)
287 .handler(new ClusterStatusEncoder(isa));
288
289 try {
290 channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
291 channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
292 channel.connect(isa).sync();
293 } catch (InterruptedException e) {
294 close();
295 throw ExceptionUtil.asInterrupt(e);
296 }
297 }
298
299 private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> {
300 private final Class<? extends T> clazz;
301 private InternetProtocolFamily family;
302
303 HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
304 this.clazz = clazz;
305 this.family = family;
306 }
307
308 @Override
309 public T newChannel() {
310 try {
311 return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
312 new Class[] { InternetProtocolFamily.class }, new Object[] { family });
313
314 } catch (Throwable t) {
315 throw new ChannelException("Unable to create Channel from class " + clazz, t);
316 }
317 }
318
319 @Override
320 public String toString() {
321 return StringUtil.simpleClassName(clazz) + ".class";
322 }
323 }
324
325 private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
326 final private InetSocketAddress isa;
327
328 private ClusterStatusEncoder(InetSocketAddress isa) {
329 this.isa = isa;
330 }
331
332 @Override
333 protected void encode(ChannelHandlerContext channelHandlerContext,
334 ClusterStatus clusterStatus, List<Object> objects) {
335 ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert();
336 objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa));
337 }
338 }
339
340 @Override
341 public void publish(ClusterStatus cs) {
342 channel.writeAndFlush(cs).syncUninterruptibly();
343 }
344
345 @Override
346 public void close() {
347 if (channel != null) {
348 channel.close();
349 }
350 group.shutdownGracefully();
351 }
352 }
353 }