View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.master;
22  
23  
24  import io.netty.bootstrap.Bootstrap;
25  import io.netty.buffer.Unpooled;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.EventLoopGroup;
29  import io.netty.channel.nio.NioEventLoopGroup;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.nio.NioDatagramChannel;
33  import io.netty.handler.codec.MessageToMessageEncoder;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Chore;
37  import org.apache.hadoop.hbase.ClusterStatus;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
42  import org.apache.hadoop.hbase.util.Addressing;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44  import org.apache.hadoop.hbase.util.ExceptionUtil;
45  import org.apache.hadoop.hbase.util.Pair;
46  import org.apache.hadoop.hbase.util.Threads;
47  import org.apache.hadoop.hbase.util.VersionInfo;
48  
49  import java.io.Closeable;
50  import java.io.IOException;
51  import java.net.InetAddress;
52  import java.net.InetSocketAddress;
53  import java.net.NetworkInterface;
54  import java.net.UnknownHostException;
55  import java.util.ArrayList;
56  import java.util.Collections;
57  import java.util.Comparator;
58  import java.util.List;
59  import java.util.Map;
60  import java.util.concurrent.ConcurrentHashMap;
61  import java.util.concurrent.ConcurrentMap;
62  
63  /**
64   * Class to publish the cluster status to the client. This allows them to know immediately
65   *  the dead region servers, hence to cut the connection they have with them, eventually stop
66   *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
67   *  on the client the different timeouts, as the dead servers will be detected separately.
68   */
69  @InterfaceAudience.Private
70  public class ClusterStatusPublisher extends Chore {
71    /**
72     * The implementation class used to publish the status. Default is null (no publish).
73     * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
74     * status.
75     */
76    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
77    public static final Class<? extends ClusterStatusPublisher.Publisher>
78        DEFAULT_STATUS_PUBLISHER_CLASS =
79        org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
80  
81    /**
82     * The minimum time between two status messages, in milliseconds.
83     */
84    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
85    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
86  
87    private long lastMessageTime = 0;
88    private final HMaster master;
89    private final int messagePeriod; // time between two message
90    private final ConcurrentMap<ServerName, Integer> lastSent =
91        new ConcurrentHashMap<ServerName, Integer>();
92    private Publisher publisher;
93    private boolean connected = false;
94  
95    /**
96     * We want to limit the size of the protobuf message sent, do fit into a single packet.
97     * a reasonable size for ip / ethernet is less than 1Kb.
98     */
99    public final static int MAX_SERVER_PER_MESSAGE = 10;
100 
101   /**
102    * If a server dies, we're sending the information multiple times in case a receiver misses the
103    * message.
104    */
105   public final static int NB_SEND = 5;
106 
107   public ClusterStatusPublisher(HMaster master, Configuration conf,
108                                 Class<? extends Publisher> publisherClass)
109       throws IOException {
110     super("HBase clusterStatusPublisher for " + master.getName(),
111         conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
112     this.master = master;
113     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
114     try {
115       this.publisher = publisherClass.newInstance();
116     } catch (InstantiationException e) {
117       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
118     } catch (IllegalAccessException e) {
119       throw new IOException("Can't create publisher " + publisherClass.getName(), e);
120     }
121     this.publisher.connect(conf);
122     connected = true;
123   }
124 
125   // For tests only
126   protected ClusterStatusPublisher() {
127     master = null;
128     messagePeriod = 0;
129   }
130 
131   @Override
132   protected void chore() {
133     if (!connected) {
134       return;
135     }
136 
137     List<ServerName> sns = generateDeadServersListToSend();
138     if (sns.isEmpty()) {
139       // Nothing to send. Done.
140       return;
141     }
142 
143     final long curTime = EnvironmentEdgeManager.currentTime();
144     if (lastMessageTime > curTime - messagePeriod) {
145       // We already sent something less than 10 second ago. Done.
146       return;
147     }
148 
149     // Ok, we're going to send something then.
150     lastMessageTime = curTime;
151 
152     // We're reusing an existing protobuf message, but we don't send everything.
153     // This could be extended in the future, for example if we want to send stuff like the
154     //  hbase:meta server name.
155     ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
156         master.getMasterFileSystem().getClusterId().toString(),
157         null,
158         sns,
159         master.getServerName(),
160         null,
161         null,
162         null,
163         null);
164 
165 
166     publisher.publish(cs);
167   }
168 
169   protected void cleanup() {
170     connected = false;
171     publisher.close();
172   }
173 
174   /**
175    * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
176    * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
177    * dead first.
178    */
179   protected List<ServerName> generateDeadServersListToSend() {
180     // We're getting the message sent since last time, and add them to the list
181     long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
182     for (Pair<ServerName, Long> dead : getDeadServers(since)) {
183       lastSent.putIfAbsent(dead.getFirst(), 0);
184     }
185 
186     // We're sending the new deads first.
187     List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
188     entries.addAll(lastSent.entrySet());
189     Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
190       @Override
191       public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
192         return o1.getValue().compareTo(o2.getValue());
193       }
194     });
195 
196     // With a limit of MAX_SERVER_PER_MESSAGE
197     int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
198     List<ServerName> res = new ArrayList<ServerName>(max);
199 
200     for (int i = 0; i < max; i++) {
201       Map.Entry<ServerName, Integer> toSend = entries.get(i);
202       if (toSend.getValue() >= (NB_SEND - 1)) {
203         lastSent.remove(toSend.getKey());
204       } else {
205         lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
206       }
207 
208       res.add(toSend.getKey());
209     }
210 
211     return res;
212   }
213 
214   /**
215    * Get the servers which died since a given timestamp.
216    * protected because it can be subclassed by the tests.
217    */
218   protected List<Pair<ServerName, Long>> getDeadServers(long since) {
219     if (master.getServerManager() == null) {
220       return Collections.emptyList();
221     }
222 
223     return master.getServerManager().getDeadServers().copyDeadServersSince(since);
224   }
225 
226 
227   public interface Publisher extends Closeable {
228 
229     void connect(Configuration conf) throws IOException;
230 
231     void publish(ClusterStatus cs);
232 
233     @Override
234     void close();
235   }
236 
237   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
238   public static class MulticastPublisher implements Publisher {
239     private DatagramChannel channel;
240     private final EventLoopGroup group = new NioEventLoopGroup(
241         1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
242 
243     public MulticastPublisher() {
244     }
245 
246     @Override
247     public void connect(Configuration conf) throws IOException {
248       NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
249 
250       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
251           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
252       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
253           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
254 
255       final InetAddress ina;
256       try {
257         ina = InetAddress.getByName(mcAddress);
258       } catch (UnknownHostException e) {
259         close();
260         throw new IOException("Can't connect to " + mcAddress, e);
261       }
262 
263       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
264 
265       Bootstrap b = new Bootstrap();
266       b.group(group)
267           .channel(NioDatagramChannel.class)
268           .option(ChannelOption.SO_REUSEADDR, true)
269           .handler(new ClusterStatusEncoder(isa));
270 
271       try {
272         channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
273         channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
274         channel.connect(isa).sync();
275       } catch (InterruptedException e) {
276         close();
277         throw ExceptionUtil.asInterrupt(e);
278       }
279     }
280 
281     private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
282       final private InetSocketAddress isa;
283 
284       private ClusterStatusEncoder(InetSocketAddress isa) {
285         this.isa = isa;
286       }
287 
288       @Override
289       protected void encode(ChannelHandlerContext channelHandlerContext,
290                             ClusterStatus clusterStatus, List<Object> objects) {
291         ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert();
292         objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa));
293       }
294     }
295 
296     @Override
297     public void publish(ClusterStatus cs) {
298       channel.writeAndFlush(cs).syncUninterruptibly();
299     }
300 
301     @Override
302     public void close() {
303       if (channel != null) {
304         channel.close();
305       }
306       group.shutdownGracefully();
307     }
308   }
309 }