View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.master;
22  
23  
24  import io.netty.bootstrap.Bootstrap;
25  import io.netty.bootstrap.ChannelFactory;
26  import io.netty.buffer.Unpooled;
27  import io.netty.channel.Channel;
28  import io.netty.channel.ChannelException;
29  import io.netty.channel.ChannelHandlerContext;
30  import io.netty.channel.ChannelOption;
31  import io.netty.channel.EventLoopGroup;
32  import io.netty.channel.nio.NioEventLoopGroup;
33  import io.netty.channel.socket.DatagramChannel;
34  import io.netty.channel.socket.DatagramPacket;
35  import io.netty.channel.socket.InternetProtocolFamily;
36  import io.netty.channel.socket.nio.NioDatagramChannel;
37  import io.netty.handler.codec.MessageToMessageEncoder;
38  import io.netty.util.internal.StringUtil;
39  
40  import java.io.Closeable;
41  import java.io.IOException;
42  import java.net.Inet6Address;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.NetworkInterface;
46  import java.net.UnknownHostException;
47  import java.util.ArrayList;
48  import java.util.Collections;
49  import java.util.Comparator;
50  import java.util.List;
51  import java.util.Map;
52  import java.util.concurrent.ConcurrentHashMap;
53  import java.util.concurrent.ConcurrentMap;
54  
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.hbase.ClusterStatus;
57  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
58  import org.apache.hadoop.hbase.HConstants;
59  import org.apache.hadoop.hbase.ScheduledChore;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.classification.InterfaceAudience;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
64  import org.apache.hadoop.hbase.util.Addressing;
65  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66  import org.apache.hadoop.hbase.util.ExceptionUtil;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.hbase.util.ReflectionUtils;
69  import org.apache.hadoop.hbase.util.Threads;
70  import org.apache.hadoop.hbase.util.VersionInfo;
71  
72
73  /**
74   * Class to publish the cluster status to the client. This allows them to know immediately
75   *  the dead region servers, hence to cut the connection they have with them, eventually stop
76   *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
77   *  on the client the different timeouts, as the dead servers will be detected separately.
78   */
79  @InterfaceAudience.Private
80  public class ClusterStatusPublisher extends ScheduledChore {
81    /**
82     * The implementation class used to publish the status. Default is null (no publish).
83     * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
84     * status.
85     */
86    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
87    public static final Class<? extends ClusterStatusPublisher.Publisher>
88        DEFAULT_STATUS_PUBLISHER_CLASS =
89        org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
90
91    /**
92     * The minimum time between two status messages, in milliseconds.
93     */
94    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
95    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
96
97    private long lastMessageTime = 0;
98    private final HMaster master;
99    private final int messagePeriod; // time between two message
100   private final ConcurrentMap<ServerName, Integer> lastSent =
101       new ConcurrentHashMap<ServerName, Integer>();
102   private Publisher publisher;
103   private boolean connected = false;
104
105   /**
106    * We want to limit the size of the protobuf message sent, do fit into a single packet.
107    * a reasonable size for ip / ethernet is less than 1Kb.
108    */
109   public final static int MAX_SERVER_PER_MESSAGE = 10;
110
111   /**
112    * If a server dies, we're sending the information multiple times in case a receiver misses the
113    * message.
114    */
115   public final static int NB_SEND = 5;
116
117   public ClusterStatusPublisher(HMaster master, Configuration conf,
118                                 Class<? extends Publisher> publisherClass)
119       throws IOException {
120     super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
121       STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
122     this.master = master;
123     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
124     try {
125       this.publisher = publisherClass.newInstance();
126     } catch (InstantiationException e) {
127       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
128     } catch (IllegalAccessException e) {
129       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
130     }
131     this.publisher.connect(conf);
132     connected = true;
133   }
134
135   // For tests only
136   protected ClusterStatusPublisher() {
137     master = null;
138     messagePeriod = 0;
139   }
140
141   @Override
142   protected void chore() {
143     if (!connected) {
144       return;
145     }
146
147     List<ServerName> sns = generateDeadServersListToSend();
148     if (sns.isEmpty()) {
149       // Nothing to send. Done.
150       return;
151     }
152
153     final long curTime = EnvironmentEdgeManager.currentTime();
154     if (lastMessageTime > curTime - messagePeriod) {
155       // We already sent something less than 10 second ago. Done.
156       return;
157     }
158
159     // Ok, we're going to send something then.
160     lastMessageTime = curTime;
161
162     // We're reusing an existing protobuf message, but we don't send everything.
163     // This could be extended in the future, for example if we want to send stuff like the
164     //  hbase:meta server name.
165     ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
166         master.getMasterFileSystem().getClusterId().toString(),
167         null,
168         sns,
169         master.getServerName(),
170         null,
171         null,
172         null,
173         null);
174 
175
176     publisher.publish(cs);
177   }
178
179   protected void cleanup() {
180     connected = false;
181     publisher.close();
182   }
183
184   /**
185    * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
186    * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
187    * dead first.
188    */
189   protected List<ServerName> generateDeadServersListToSend() {
190     // We're getting the message sent since last time, and add them to the list
191     long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
192     for (Pair<ServerName, Long> dead : getDeadServers(since)) {
193       lastSent.putIfAbsent(dead.getFirst(), 0);
194     }
195
196     // We're sending the new deads first.
197     List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
198     entries.addAll(lastSent.entrySet());
199     Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
200       @Override
201       public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
202         return o1.getValue().compareTo(o2.getValue());
203       }
204     });
205
206     // With a limit of MAX_SERVER_PER_MESSAGE
207     int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
208     List<ServerName> res = new ArrayList<ServerName>(max);
209
210     for (int i = 0; i < max; i++) {
211       Map.Entry<ServerName, Integer> toSend = entries.get(i);
212       if (toSend.getValue() >= (NB_SEND - 1)) {
213         lastSent.remove(toSend.getKey());
214       } else {
215         lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
216       }
217
218       res.add(toSend.getKey());
219     }
220
221     return res;
222   }
223
224   /**
225    * Get the servers which died since a given timestamp.
226    * protected because it can be subclassed by the tests.
227    */
228   protected List<Pair<ServerName, Long>> getDeadServers(long since) {
229     if (master.getServerManager() == null) {
230       return Collections.emptyList();
231     }
232
233     return master.getServerManager().getDeadServers().copyDeadServersSince(since);
234   }
235 
236
237   public interface Publisher extends Closeable {
238
239     void connect(Configuration conf) throws IOException;
240
241     void publish(ClusterStatus cs);
242
243     @Override
244     void close();
245   }
246
247   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
248   public static class MulticastPublisher implements Publisher {
249     private DatagramChannel channel;
250     private final EventLoopGroup group = new NioEventLoopGroup(
251         1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
252
253     public MulticastPublisher() {
254     }
255
256     @Override
257     public void connect(Configuration conf) throws IOException {
258       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
259           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
260       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
261           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
262
263       final InetAddress ina;
264       try {
265         ina = InetAddress.getByName(mcAddress);
266       } catch (UnknownHostException e) {
267         close();
268         throw new IOException("Can't connect to " + mcAddress, e);
269       }
270
271       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
272
273       InternetProtocolFamily family;
274       InetAddress localAddress;
275       if (ina instanceof Inet6Address) {
276         localAddress = Addressing.getIp6Address();
277         family = InternetProtocolFamily.IPv6;
278       }else{
279         localAddress = Addressing.getIp4Address();
280         family = InternetProtocolFamily.IPv4;
281       }
282       NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
283
284       Bootstrap b = new Bootstrap();
285       b.group(group)
286       .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
287       .option(ChannelOption.SO_REUSEADDR, true)
288       .handler(new ClusterStatusEncoder(isa));
289
290       try {
291         channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
292         channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
293         channel.connect(isa).sync();
294       } catch (InterruptedException e) {
295         close();
296         throw ExceptionUtil.asInterrupt(e);
297       }
298     }
299
300     private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> {
301       private final Class<? extends T> clazz;
302       private InternetProtocolFamily family;
303
304       HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
305           this.clazz = clazz;
306           this.family = family;
307       }
308
309       @Override
310       public T newChannel() {
311           try {
312             return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
313               new Class[] { InternetProtocolFamily.class }, new Object[] { family });
314
315           } catch (Throwable t) {
316               throw new ChannelException("Unable to create Channel from class " + clazz, t);
317           }
318       }
319
320       @Override
321       public String toString() {
322           return StringUtil.simpleClassName(clazz) + ".class";
323       }
324   }
325
326     private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
327       final private InetSocketAddress isa;
328
329       private ClusterStatusEncoder(InetSocketAddress isa) {
330         this.isa = isa;
331       }
332
333       @Override
334       protected void encode(ChannelHandlerContext channelHandlerContext,
335                             ClusterStatus clusterStatus, List<Object> objects) {
336         ClusterStatusProtos.ClusterStatus csp = ProtobufUtil.convert(clusterStatus);
337         objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa));
338       }
339     }
340
341     @Override
342     public void publish(ClusterStatus cs) {
343       channel.writeAndFlush(cs).syncUninterruptibly();
344     }
345
346     @Override
347     public void close() {
348       if (channel != null) {
349         channel.close();
350       }
351       group.shutdownGracefully();
352     }
353   }
354 }