1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  package org.apache.hadoop.hbase.master;
22  
23  
24  import io.netty.bootstrap.Bootstrap;
25  import io.netty.bootstrap.ChannelFactory;
26  import io.netty.buffer.Unpooled;
27  import io.netty.channel.Channel;
28  import io.netty.channel.ChannelException;
29  import io.netty.channel.ChannelHandlerContext;
30  import io.netty.channel.ChannelOption;
31  import io.netty.channel.EventLoopGroup;
32  import io.netty.channel.nio.NioEventLoopGroup;
33  import io.netty.channel.socket.DatagramChannel;
34  import io.netty.channel.socket.DatagramPacket;
35  import io.netty.channel.socket.InternetProtocolFamily;
36  import io.netty.channel.socket.nio.NioDatagramChannel;
37  import io.netty.handler.codec.MessageToMessageEncoder;
38  import io.netty.util.internal.StringUtil;
39  
40  import java.io.Closeable;
41  import java.io.IOException;
42  import java.net.Inet6Address;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.NetworkInterface;
46  import java.net.UnknownHostException;
47  import java.util.ArrayList;
48  import java.util.Collections;
49  import java.util.Comparator;
50  import java.util.List;
51  import java.util.Map;
52  import java.util.concurrent.ConcurrentHashMap;
53  import java.util.concurrent.ConcurrentMap;
54  
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.hbase.ClusterStatus;
57  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
58  import org.apache.hadoop.hbase.HConstants;
59  import org.apache.hadoop.hbase.ScheduledChore;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.classification.InterfaceAudience;
62  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
63  import org.apache.hadoop.hbase.util.Addressing;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.hbase.util.ExceptionUtil;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.hbase.util.ReflectionUtils;
68  import org.apache.hadoop.hbase.util.Threads;
69  import org.apache.hadoop.hbase.util.VersionInfo;
70  
71  
72  
73  
74  
75  
76  
77  
78  @InterfaceAudience.Private
79  public class ClusterStatusPublisher extends ScheduledChore {
80    
81  
82  
83  
84  
85    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
86    public static final Class<? extends ClusterStatusPublisher.Publisher>
87        DEFAULT_STATUS_PUBLISHER_CLASS =
88        org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
89  
90    
91  
92  
93    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
94    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
95  
96    private long lastMessageTime = 0;
97    private final HMaster master;
98    private final int messagePeriod; 
99    private final ConcurrentMap<ServerName, Integer> lastSent =
100       new ConcurrentHashMap<ServerName, Integer>();
101   private Publisher publisher;
102   private boolean connected = false;
103 
104   
105 
106 
107 
108   public final static int MAX_SERVER_PER_MESSAGE = 10;
109 
110   
111 
112 
113 
114   public final static int NB_SEND = 5;
115 
116   public ClusterStatusPublisher(HMaster master, Configuration conf,
117                                 Class<? extends Publisher> publisherClass)
118       throws IOException {
119     super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
120       STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
121     this.master = master;
122     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
123     try {
124       this.publisher = publisherClass.newInstance();
125     } catch (InstantiationException e) {
126       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
127     } catch (IllegalAccessException e) {
128       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
129     }
130     this.publisher.connect(conf);
131     connected = true;
132   }
133 
134   
135   protected ClusterStatusPublisher() {
136     master = null;
137     messagePeriod = 0;
138   }
139 
140   @Override
141   protected void chore() {
142     if (!connected) {
143       return;
144     }
145 
146     List<ServerName> sns = generateDeadServersListToSend();
147     if (sns.isEmpty()) {
148       
149       return;
150     }
151 
152     final long curTime = EnvironmentEdgeManager.currentTime();
153     if (lastMessageTime > curTime - messagePeriod) {
154       
155       return;
156     }
157 
158     
159     lastMessageTime = curTime;
160 
161     
162     
163     
164     ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
165         master.getMasterFileSystem().getClusterId().toString(),
166         null,
167         sns,
168         master.getServerName(),
169         null,
170         null,
171         null,
172         null);
173 
174 
175     publisher.publish(cs);
176   }
177 
178   protected void cleanup() {
179     connected = false;
180     publisher.close();
181   }
182 
183   
184 
185 
186 
187 
188   protected List<ServerName> generateDeadServersListToSend() {
189     
190     long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
191     for (Pair<ServerName, Long> dead : getDeadServers(since)) {
192       lastSent.putIfAbsent(dead.getFirst(), 0);
193     }
194 
195     
196     List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
197     entries.addAll(lastSent.entrySet());
198     Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
199       @Override
200       public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
201         return o1.getValue().compareTo(o2.getValue());
202       }
203     });
204 
205     
206     int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
207     List<ServerName> res = new ArrayList<ServerName>(max);
208 
209     for (int i = 0; i < max; i++) {
210       Map.Entry<ServerName, Integer> toSend = entries.get(i);
211       if (toSend.getValue() >= (NB_SEND - 1)) {
212         lastSent.remove(toSend.getKey());
213       } else {
214         lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
215       }
216 
217       res.add(toSend.getKey());
218     }
219 
220     return res;
221   }
222 
223   
224 
225 
226 
227   protected List<Pair<ServerName, Long>> getDeadServers(long since) {
228     if (master.getServerManager() == null) {
229       return Collections.emptyList();
230     }
231 
232     return master.getServerManager().getDeadServers().copyDeadServersSince(since);
233   }
234 
235 
236   public interface Publisher extends Closeable {
237 
238     void connect(Configuration conf) throws IOException;
239 
240     void publish(ClusterStatus cs);
241 
242     @Override
243     void close();
244   }
245 
246   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
247   public static class MulticastPublisher implements Publisher {
248     private DatagramChannel channel;
249     private final EventLoopGroup group = new NioEventLoopGroup(
250         1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
251 
252     public MulticastPublisher() {
253     }
254 
255     @Override
256     public void connect(Configuration conf) throws IOException {
257       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
258           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
259       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
260           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
261 
262       final InetAddress ina;
263       try {
264         ina = InetAddress.getByName(mcAddress);
265       } catch (UnknownHostException e) {
266         close();
267         throw new IOException("Can't connect to " + mcAddress, e);
268       }
269 
270       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
271 
272       InternetProtocolFamily family;
273       InetAddress localAddress;
274       if (ina instanceof Inet6Address) {
275         localAddress = Addressing.getIp6Address();
276         family = InternetProtocolFamily.IPv6;
277       }else{
278         localAddress = Addressing.getIp4Address();
279         family = InternetProtocolFamily.IPv4;
280       }
281       NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
282 
283       Bootstrap b = new Bootstrap();
284       b.group(group)
285       .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
286       .option(ChannelOption.SO_REUSEADDR, true)
287       .handler(new ClusterStatusEncoder(isa));
288 
289       try {
290         channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
291         channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
292         channel.connect(isa).sync();
293       } catch (InterruptedException e) {
294         close();
295         throw ExceptionUtil.asInterrupt(e);
296       }
297     }
298 
299     private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> {
300       private final Class<? extends T> clazz;
301       private InternetProtocolFamily family;
302 
303       HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
304           this.clazz = clazz;
305           this.family = family;
306       }
307 
308       @Override
309       public T newChannel() {
310           try {
311             return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
312               new Class[] { InternetProtocolFamily.class }, new Object[] { family });
313 
314           } catch (Throwable t) {
315               throw new ChannelException("Unable to create Channel from class " + clazz, t);
316           }
317       }
318 
319       @Override
320       public String toString() {
321           return StringUtil.simpleClassName(clazz) + ".class";
322       }
323   }
324 
325     private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
326       final private InetSocketAddress isa;
327 
328       private ClusterStatusEncoder(InetSocketAddress isa) {
329         this.isa = isa;
330       }
331 
332       @Override
333       protected void encode(ChannelHandlerContext channelHandlerContext,
334                             ClusterStatus clusterStatus, List<Object> objects) {
335         ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert();
336         objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa));
337       }
338     }
339 
340     @Override
341     public void publish(ClusterStatus cs) {
342       channel.writeAndFlush(cs).syncUninterruptibly();
343     }
344 
345     @Override
346     public void close() {
347       if (channel != null) {
348         channel.close();
349       }
350       group.shutdownGracefully();
351     }
352   }
353 }