View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.master;
22  
23  
24  import io.netty.bootstrap.Bootstrap;
25  import io.netty.bootstrap.ChannelFactory;
26  import io.netty.buffer.Unpooled;
27  import io.netty.channel.Channel;
28  import io.netty.channel.ChannelException;
29  import io.netty.channel.ChannelHandlerContext;
30  import io.netty.channel.ChannelOption;
31  import io.netty.channel.EventLoopGroup;
32  import io.netty.channel.nio.NioEventLoopGroup;
33  import io.netty.channel.socket.DatagramChannel;
34  import io.netty.channel.socket.DatagramPacket;
35  import io.netty.channel.socket.InternetProtocolFamily;
36  import io.netty.channel.socket.nio.NioDatagramChannel;
37  import io.netty.handler.codec.MessageToMessageEncoder;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import io.netty.util.internal.StringUtil;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.Chore;
42  import org.apache.hadoop.hbase.ClusterStatus;
43  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
47  import org.apache.hadoop.hbase.util.Addressing;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.ExceptionUtil;
50  import org.apache.hadoop.hbase.util.Pair;
51  import org.apache.hadoop.hbase.util.ReflectionUtils;
52  import org.apache.hadoop.hbase.util.Threads;
53  import org.apache.hadoop.hbase.util.VersionInfo;
54  
55  import java.io.Closeable;
56  import java.io.IOException;
57  import java.net.Inet6Address;
58  import java.net.InetAddress;
59  import java.net.InetSocketAddress;
60  import java.net.NetworkInterface;
61  import java.net.UnknownHostException;
62  import java.util.ArrayList;
63  import java.util.Collections;
64  import java.util.Comparator;
65  import java.util.List;
66  import java.util.Map;
67  import java.util.concurrent.ConcurrentHashMap;
68  import java.util.concurrent.ConcurrentMap;
69  
70  
71  /**
72   * Class to publish the cluster status to the client. This allows them to know immediately
73   *  the dead region servers, hence to cut the connection they have with them, eventually stop
74   *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
75   *  on the client the different timeouts, as the dead servers will be detected separately.
76   */
77  @InterfaceAudience.Private
78  public class ClusterStatusPublisher extends Chore {
79    /**
80     * The implementation class used to publish the status. Default is null (no publish).
81     * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
82     * status.
83     */
84    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
85    public static final Class<? extends ClusterStatusPublisher.Publisher>
86        DEFAULT_STATUS_PUBLISHER_CLASS =
87        org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
88  
89    /**
90     * The minimum time between two status messages, in milliseconds.
91     */
92    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
93    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
94  
95    private long lastMessageTime = 0;
96    private final HMaster master;
97    private final int messagePeriod; // time between two message
98    private final ConcurrentMap<ServerName, Integer> lastSent =
99        new ConcurrentHashMap<ServerName, Integer>();
100   private Publisher publisher;
101   private boolean connected = false;
102 
103   /**
104    * We want to limit the size of the protobuf message sent, do fit into a single packet.
105    * a reasonable size for ip / ethernet is less than 1Kb.
106    */
107   public final static int MAX_SERVER_PER_MESSAGE = 10;
108 
109   /**
110    * If a server dies, we're sending the information multiple times in case a receiver misses the
111    * message.
112    */
113   public final static int NB_SEND = 5;
114 
115   public ClusterStatusPublisher(HMaster master, Configuration conf,
116                                 Class<? extends Publisher> publisherClass)
117       throws IOException {
118     super("HBase clusterStatusPublisher for " + master.getName(),
119         conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
120     this.master = master;
121     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
122     try {
123       this.publisher = publisherClass.newInstance();
124     } catch (InstantiationException e) {
125       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
126     } catch (IllegalAccessException e) {
127       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
128     }
129     this.publisher.connect(conf);
130     connected = true;
131   }
132 
133   // For tests only
134   protected ClusterStatusPublisher() {
135     master = null;
136     messagePeriod = 0;
137   }
138 
139   @Override
140   protected void chore() {
141     if (!connected) {
142       return;
143     }
144 
145     List<ServerName> sns = generateDeadServersListToSend();
146     if (sns.isEmpty()) {
147       // Nothing to send. Done.
148       return;
149     }
150 
151     final long curTime = EnvironmentEdgeManager.currentTime();
152     if (lastMessageTime > curTime - messagePeriod) {
153       // We already sent something less than 10 second ago. Done.
154       return;
155     }
156 
157     // Ok, we're going to send something then.
158     lastMessageTime = curTime;
159 
160     // We're reusing an existing protobuf message, but we don't send everything.
161     // This could be extended in the future, for example if we want to send stuff like the
162     //  hbase:meta server name.
163     ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
164         master.getMasterFileSystem().getClusterId().toString(),
165         null,
166         sns,
167         master.getServerName(),
168         null,
169         null,
170         null,
171         null);
172 
173 
174     publisher.publish(cs);
175   }
176 
177   protected void cleanup() {
178     connected = false;
179     publisher.close();
180   }
181 
182   /**
183    * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
184    * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
185    * dead first.
186    */
187   protected List<ServerName> generateDeadServersListToSend() {
188     // We're getting the message sent since last time, and add them to the list
189     long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
190     for (Pair<ServerName, Long> dead : getDeadServers(since)) {
191       lastSent.putIfAbsent(dead.getFirst(), 0);
192     }
193 
194     // We're sending the new deads first.
195     List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
196     entries.addAll(lastSent.entrySet());
197     Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
198       @Override
199       public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
200         return o1.getValue().compareTo(o2.getValue());
201       }
202     });
203 
204     // With a limit of MAX_SERVER_PER_MESSAGE
205     int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
206     List<ServerName> res = new ArrayList<ServerName>(max);
207 
208     for (int i = 0; i < max; i++) {
209       Map.Entry<ServerName, Integer> toSend = entries.get(i);
210       if (toSend.getValue() >= (NB_SEND - 1)) {
211         lastSent.remove(toSend.getKey());
212       } else {
213         lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
214       }
215 
216       res.add(toSend.getKey());
217     }
218 
219     return res;
220   }
221 
222   /**
223    * Get the servers which died since a given timestamp.
224    * protected because it can be subclassed by the tests.
225    */
226   protected List<Pair<ServerName, Long>> getDeadServers(long since) {
227     if (master.getServerManager() == null) {
228       return Collections.emptyList();
229     }
230 
231     return master.getServerManager().getDeadServers().copyDeadServersSince(since);
232   }
233 
234 
235   public interface Publisher extends Closeable {
236 
237     void connect(Configuration conf) throws IOException;
238 
239     void publish(ClusterStatus cs);
240 
241     @Override
242     void close();
243   }
244 
245   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
246   public static class MulticastPublisher implements Publisher {
247     private DatagramChannel channel;
248     private final EventLoopGroup group = new NioEventLoopGroup(
249         1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
250 
251     public MulticastPublisher() {
252     }
253 
254     @Override
255     public void connect(Configuration conf) throws IOException {
256       NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
257 
258       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
259           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
260       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
261           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
262 
263       final InetAddress ina;
264       try {
265         ina = InetAddress.getByName(mcAddress);
266       } catch (UnknownHostException e) {
267         close();
268         throw new IOException("Can't connect to " + mcAddress, e);
269       }
270 
271       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
272       InternetProtocolFamily family = InternetProtocolFamily.IPv4;
273       if (ina instanceof Inet6Address) {
274         family = InternetProtocolFamily.IPv6;
275       }
276 
277       Bootstrap b = new Bootstrap();
278 
279       b.group(group)
280       .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
281       .option(ChannelOption.SO_REUSEADDR, true)
282       .handler(new ClusterStatusEncoder(isa));
283 
284       try {
285         channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
286         channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
287         channel.connect(isa).sync();
288       } catch (InterruptedException e) {
289         close();
290         throw ExceptionUtil.asInterrupt(e);
291       }
292     }
293 
294     private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> {
295       private final Class<? extends T> clazz;
296       private InternetProtocolFamily family;
297 
298       HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
299           this.clazz = clazz;
300           this.family = family;
301       }
302 
303       @Override
304       public T newChannel() {
305           try {
306             return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
307               new Class[] { InternetProtocolFamily.class }, new Object[] { family });
308 
309           } catch (Throwable t) {
310               throw new ChannelException("Unable to create Channel from class " + clazz, t);
311           }
312       }
313 
314       @Override
315       public String toString() {
316           return StringUtil.simpleClassName(clazz) + ".class";
317       }
318   }
319 
320     private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
321       final private InetSocketAddress isa;
322 
323       private ClusterStatusEncoder(InetSocketAddress isa) {
324         this.isa = isa;
325       }
326 
327       @Override
328       protected void encode(ChannelHandlerContext channelHandlerContext,
329                             ClusterStatus clusterStatus, List<Object> objects) {
330         ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert();
331         objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa));
332       }
333     }
334 
335     @Override
336     public void publish(ClusterStatus cs) {
337       channel.writeAndFlush(cs).syncUninterruptibly();
338     }
339 
340     @Override
341     public void close() {
342       if (channel != null) {
343         channel.close();
344       }
345       group.shutdownGracefully();
346     }
347   }
348 }