View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import io.netty.bootstrap.Bootstrap;
24  import io.netty.buffer.ByteBufInputStream;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.nio.NioEventLoopGroup;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.nio.NioDatagramChannel;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
38  import org.apache.hadoop.hbase.ClusterStatus;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
42  import org.apache.hadoop.hbase.util.Addressing;
43  import org.apache.hadoop.hbase.util.ExceptionUtil;
44  import org.apache.hadoop.hbase.util.Threads;
45  
46  import java.io.Closeable;
47  import java.io.IOException;
48  import java.lang.reflect.Constructor;
49  import java.lang.reflect.InvocationTargetException;
50  import java.net.InetAddress;
51  import java.net.NetworkInterface;
52  import java.net.UnknownHostException;
53  import java.util.ArrayList;
54  import java.util.List;
55  
56  
57  /**
58   * A class that receives the cluster status, and provide it as a set of service to the client.
59   * Today, manages only the dead server list.
60   * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
61   */
62  @InterfaceAudience.Private
63  class ClusterStatusListener implements Closeable {
64    private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
65    private final List<ServerName> deadServers = new ArrayList<ServerName>();
66    protected final DeadServerHandler deadServerHandler;
67    private final Listener listener;
68  
69    /**
70     * The implementation class to use to read the status.
71     */
72    public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
73    public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
74        MulticastListener.class;
75  
76    /**
77     * Class to be extended to manage a new dead server.
78     */
79    public interface DeadServerHandler {
80  
81      /**
82       * Called when a server is identified as dead. Called only once even if we receive the
83       * information multiple times.
84       *
85       * @param sn - the server name
86       */
87      void newDead(ServerName sn);
88    }
89  
90  
91    /**
92     * The interface to be implemented by a listener of a cluster status event.
93     */
94    interface Listener extends Closeable {
95      /**
96       * Called to close the resources, if any. Cannot throw an exception.
97       */
98      @Override
99      void close();
100 
101     /**
102      * Called to connect.
103      *
104      * @param conf Configuration to use.
105      * @throws IOException
106      */
107     void connect(Configuration conf) throws IOException;
108   }
109 
110   public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
111                                Class<? extends Listener> listenerClass) throws IOException {
112     this.deadServerHandler = dsh;
113     try {
114       Constructor<? extends Listener> ctor =
115           listenerClass.getConstructor(ClusterStatusListener.class);
116       this.listener = ctor.newInstance(this);
117     } catch (InstantiationException e) {
118       throw new IOException("Can't create listener " + listenerClass.getName(), e);
119     } catch (IllegalAccessException e) {
120       throw new IOException("Can't create listener " + listenerClass.getName(), e);
121     } catch (NoSuchMethodException e) {
122       throw new IllegalStateException();
123     } catch (InvocationTargetException e) {
124       throw new IllegalStateException();
125     }
126 
127     this.listener.connect(conf);
128   }
129 
130   /**
131    * Acts upon the reception of a new cluster status.
132    *
133    * @param ncs the cluster status
134    */
135   public void receive(ClusterStatus ncs) {
136     if (ncs.getDeadServerNames() != null) {
137       for (ServerName sn : ncs.getDeadServerNames()) {
138         if (!isDeadServer(sn)) {
139           LOG.info("There is a new dead server: " + sn);
140           deadServers.add(sn);
141           if (deadServerHandler != null) {
142             deadServerHandler.newDead(sn);
143           }
144         }
145       }
146     }
147   }
148 
149   @Override
150   public void close() {
151     listener.close();
152   }
153 
154   /**
155    * Check if we know if a server is dead.
156    *
157    * @param sn the server name to check.
158    * @return true if we know for sure that the server is dead, false otherwise.
159    */
160   public boolean isDeadServer(ServerName sn) {
161     if (sn.getStartcode() <= 0) {
162       return false;
163     }
164 
165     for (ServerName dead : deadServers) {
166       if (dead.getStartcode() >= sn.getStartcode() &&
167           dead.getPort() == sn.getPort() &&
168           dead.getHostname().equals(sn.getHostname())) {
169         return true;
170       }
171     }
172 
173     return false;
174   }
175 
176 
177   /**
178    * An implementation using a multicast message between the master & the client.
179    */
180   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
181   class MulticastListener implements Listener {
182     private DatagramChannel channel;
183     private final EventLoopGroup group = new NioEventLoopGroup(
184         1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
185 
186     public MulticastListener() {
187     }
188 
189     @Override
190     public void connect(Configuration conf) throws IOException {
191 
192       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
193           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
194       String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
195         HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
196       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
197           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
198 
199       InetAddress ina;
200       try {
201         ina = InetAddress.getByName(mcAddress);
202       } catch (UnknownHostException e) {
203         close();
204         throw new IOException("Can't connect to " + mcAddress, e);
205       }
206 
207       try {
208         Bootstrap b = new Bootstrap();
209         b.group(group)
210             .channel(NioDatagramChannel.class)
211             .option(ChannelOption.SO_REUSEADDR, true)
212             .handler(new ClusterStatusHandler());
213 
214         channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
215       } catch (InterruptedException e) {
216         close();
217         throw ExceptionUtil.asInterrupt(e);
218       }
219 
220       NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
221       channel.joinGroup(ina, ni, null, channel.newPromise());
222     }
223 
224     @Override
225     public void close() {
226       if (channel != null) {
227         channel.close();
228         channel = null;
229       }
230       group.shutdownGracefully();
231     }
232 
233 
234 
235     /**
236      * Class, conforming to the Netty framework, that manages the message received.
237      */
238     private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
239 
240       @Override
241       public void exceptionCaught(
242           ChannelHandlerContext ctx, Throwable cause)
243           throws Exception {
244         LOG.error("Unexpected exception, continuing.", cause);
245       }
246 
247       @Override
248       public boolean acceptInboundMessage(Object msg)
249           throws Exception {
250         return super.acceptInboundMessage(msg);
251       }
252 
253 
254       @Override
255       protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
256         ByteBufInputStream bis = new ByteBufInputStream(dp.content());
257         try {
258           ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
259           ClusterStatus ncs = ClusterStatus.convert(csp);
260           receive(ncs);
261         } finally {
262           bis.close();
263         }
264       }
265     }
266   }
267 }