View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import io.netty.bootstrap.Bootstrap;
24  import io.netty.buffer.ByteBufInputStream;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.nio.NioEventLoopGroup;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.nio.NioDatagramChannel;
33  
34  import java.io.Closeable;
35  import java.io.IOException;
36  import java.lang.reflect.Constructor;
37  import java.lang.reflect.InvocationTargetException;
38  import java.net.InetAddress;
39  import java.net.NetworkInterface;
40  import java.net.Inet6Address;
41  import java.net.UnknownHostException;
42  import java.util.ArrayList;
43  import java.util.List;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.ClusterStatus;
49  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
54  import org.apache.hadoop.hbase.util.Addressing;
55  import org.apache.hadoop.hbase.util.ExceptionUtil;
56  import org.apache.hadoop.hbase.util.Threads;
57  
58  
59  /**
60   * A class that receives the cluster status, and provide it as a set of service to the client.
61   * Today, manages only the dead server list.
62   * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
63   */
64  @InterfaceAudience.Private
65  class ClusterStatusListener implements Closeable {
66    private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
67    private final List<ServerName> deadServers = new ArrayList<ServerName>();
68    protected final DeadServerHandler deadServerHandler;
69    private final Listener listener;
70  
71    /**
72     * The implementation class to use to read the status.
73     */
74    public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
75    public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
76        MulticastListener.class;
77  
78    /**
79     * Class to be extended to manage a new dead server.
80     */
81    public interface DeadServerHandler {
82  
83      /**
84       * Called when a server is identified as dead. Called only once even if we receive the
85       * information multiple times.
86       *
87       * @param sn - the server name
88       */
89      void newDead(ServerName sn);
90    }
91  
92  
93    /**
94     * The interface to be implemented by a listener of a cluster status event.
95     */
96    interface Listener extends Closeable {
97      /**
98       * Called to close the resources, if any. Cannot throw an exception.
99       */
100     @Override
101     void close();
102 
103     /**
104      * Called to connect.
105      *
106      * @param conf Configuration to use.
107      * @throws IOException
108      */
109     void connect(Configuration conf) throws IOException;
110   }
111 
112   public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
113                                Class<? extends Listener> listenerClass) throws IOException {
114     this.deadServerHandler = dsh;
115     try {
116       Constructor<? extends Listener> ctor =
117           listenerClass.getConstructor(ClusterStatusListener.class);
118       this.listener = ctor.newInstance(this);
119     } catch (InstantiationException e) {
120       throw new IOException("Can't create listener " + listenerClass.getName(), e);
121     } catch (IllegalAccessException e) {
122       throw new IOException("Can't create listener " + listenerClass.getName(), e);
123     } catch (NoSuchMethodException e) {
124       throw new IllegalStateException();
125     } catch (InvocationTargetException e) {
126       throw new IllegalStateException();
127     }
128 
129     this.listener.connect(conf);
130   }
131 
132   /**
133    * Acts upon the reception of a new cluster status.
134    *
135    * @param ncs the cluster status
136    */
137   public void receive(ClusterStatus ncs) {
138     if (ncs.getDeadServerNames() != null) {
139       for (ServerName sn : ncs.getDeadServerNames()) {
140         if (!isDeadServer(sn)) {
141           LOG.info("There is a new dead server: " + sn);
142           deadServers.add(sn);
143           if (deadServerHandler != null) {
144             deadServerHandler.newDead(sn);
145           }
146         }
147       }
148     }
149   }
150 
151   @Override
152   public void close() {
153     listener.close();
154   }
155 
156   /**
157    * Check if we know if a server is dead.
158    *
159    * @param sn the server name to check.
160    * @return true if we know for sure that the server is dead, false otherwise.
161    */
162   public boolean isDeadServer(ServerName sn) {
163     if (sn.getStartcode() <= 0) {
164       return false;
165     }
166 
167     for (ServerName dead : deadServers) {
168       if (dead.getStartcode() >= sn.getStartcode() &&
169           dead.getPort() == sn.getPort() &&
170           dead.getHostname().equals(sn.getHostname())) {
171         return true;
172       }
173     }
174 
175     return false;
176   }
177 
178 
179   /**
180    * An implementation using a multicast message between the master & the client.
181    */
182   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
183   class MulticastListener implements Listener {
184     private DatagramChannel channel;
185     private final EventLoopGroup group = new NioEventLoopGroup(
186         1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
187 
188     public MulticastListener() {
189     }
190 
191     @Override
192     public void connect(Configuration conf) throws IOException {
193 
194       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
195           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
196       String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
197         HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
198       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
199           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
200 
201       InetAddress ina;
202       try {
203         ina = InetAddress.getByName(mcAddress);
204       } catch (UnknownHostException e) {
205         close();
206         throw new IOException("Can't connect to " + mcAddress, e);
207       }
208 
209       try {
210         Bootstrap b = new Bootstrap();
211         b.group(group)
212             .channel(NioDatagramChannel.class)
213             .option(ChannelOption.SO_REUSEADDR, true)
214             .handler(new ClusterStatusHandler());
215 
216         channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
217       } catch (InterruptedException e) {
218         close();
219         throw ExceptionUtil.asInterrupt(e);
220       }
221 
222       NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
223       channel.joinGroup(ina, ni, null, channel.newPromise());
224     }
225 
226     @Override
227     public void close() {
228       if (channel != null) {
229         channel.close();
230         channel = null;
231       }
232       group.shutdownGracefully();
233     }
234 
235 
236 
237     /**
238      * Class, conforming to the Netty framework, that manages the message received.
239      */
240     private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
241 
242       @Override
243       public void exceptionCaught(
244           ChannelHandlerContext ctx, Throwable cause)
245           throws Exception {
246         LOG.error("Unexpected exception, continuing.", cause);
247       }
248 
249       @Override
250       public boolean acceptInboundMessage(Object msg)
251           throws Exception {
252         return super.acceptInboundMessage(msg);
253       }
254 
255 
256       @Override
257       protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
258         ByteBufInputStream bis = new ByteBufInputStream(dp.content());
259         try {
260           ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
261           ClusterStatus ncs = ClusterStatus.convert(csp);
262           receive(ncs);
263         } finally {
264           bis.close();
265         }
266       }
267     }
268   }
269 }