View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import io.netty.bootstrap.Bootstrap;
24  import io.netty.buffer.ByteBufInputStream;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.nio.NioEventLoopGroup;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.nio.NioDatagramChannel;
33  
34  import java.io.Closeable;
35  import java.io.IOException;
36  import java.lang.reflect.Constructor;
37  import java.lang.reflect.InvocationTargetException;
38  import java.net.InetAddress;
39  import java.net.NetworkInterface;
40  import java.net.UnknownHostException;
41  import java.util.ArrayList;
42  import java.util.List;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.ClusterStatus;
48  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
53  import org.apache.hadoop.hbase.util.Addressing;
54  import org.apache.hadoop.hbase.util.ExceptionUtil;
55  import org.apache.hadoop.hbase.util.Threads;
56  
57  
58  /**
59   * A class that receives the cluster status, and provide it as a set of service to the client.
60   * Today, manages only the dead server list.
61   * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
62   */
63  @InterfaceAudience.Private
64  class ClusterStatusListener implements Closeable {
65    private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
66    private final List<ServerName> deadServers = new ArrayList<ServerName>();
67    protected final DeadServerHandler deadServerHandler;
68    private final Listener listener;
69  
70    /**
71     * The implementation class to use to read the status.
72     */
73    public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
74    public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
75        MulticastListener.class;
76  
77    /**
78     * Class to be extended to manage a new dead server.
79     */
80    public interface DeadServerHandler {
81  
82      /**
83       * Called when a server is identified as dead. Called only once even if we receive the
84       * information multiple times.
85       *
86       * @param sn - the server name
87       */
88      void newDead(ServerName sn);
89    }
90  
91  
92    /**
93     * The interface to be implemented by a listener of a cluster status event.
94     */
95    interface Listener extends Closeable {
96      /**
97       * Called to close the resources, if any. Cannot throw an exception.
98       */
99      @Override
100     void close();
101 
102     /**
103      * Called to connect.
104      *
105      * @param conf Configuration to use.
106      * @throws IOException
107      */
108     void connect(Configuration conf) throws IOException;
109   }
110 
111   public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
112                                Class<? extends Listener> listenerClass) throws IOException {
113     this.deadServerHandler = dsh;
114     try {
115       Constructor<? extends Listener> ctor =
116           listenerClass.getConstructor(ClusterStatusListener.class);
117       this.listener = ctor.newInstance(this);
118     } catch (InstantiationException e) {
119       throw new IOException("Can't create listener " + listenerClass.getName(), e);
120     } catch (IllegalAccessException e) {
121       throw new IOException("Can't create listener " + listenerClass.getName(), e);
122     } catch (NoSuchMethodException e) {
123       throw new IllegalStateException();
124     } catch (InvocationTargetException e) {
125       throw new IllegalStateException();
126     }
127 
128     this.listener.connect(conf);
129   }
130 
131   /**
132    * Acts upon the reception of a new cluster status.
133    *
134    * @param ncs the cluster status
135    */
136   public void receive(ClusterStatus ncs) {
137     if (ncs.getDeadServerNames() != null) {
138       for (ServerName sn : ncs.getDeadServerNames()) {
139         if (!isDeadServer(sn)) {
140           LOG.info("There is a new dead server: " + sn);
141           deadServers.add(sn);
142           if (deadServerHandler != null) {
143             deadServerHandler.newDead(sn);
144           }
145         }
146       }
147     }
148   }
149 
150   @Override
151   public void close() {
152     listener.close();
153   }
154 
155   /**
156    * Check if we know if a server is dead.
157    *
158    * @param sn the server name to check.
159    * @return true if we know for sure that the server is dead, false otherwise.
160    */
161   public boolean isDeadServer(ServerName sn) {
162     if (sn.getStartcode() <= 0) {
163       return false;
164     }
165 
166     for (ServerName dead : deadServers) {
167       if (dead.getStartcode() >= sn.getStartcode() &&
168           dead.getPort() == sn.getPort() &&
169           dead.getHostname().equals(sn.getHostname())) {
170         return true;
171       }
172     }
173 
174     return false;
175   }
176 
177 
178   /**
179    * An implementation using a multicast message between the master & the client.
180    */
181   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
182   class MulticastListener implements Listener {
183     private DatagramChannel channel;
184     private final EventLoopGroup group = new NioEventLoopGroup(
185         1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
186 
187     public MulticastListener() {
188     }
189 
190     @Override
191     public void connect(Configuration conf) throws IOException {
192 
193       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
194           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
195       String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
196         HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
197       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
198           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
199 
200       InetAddress ina;
201       try {
202         ina = InetAddress.getByName(mcAddress);
203       } catch (UnknownHostException e) {
204         close();
205         throw new IOException("Can't connect to " + mcAddress, e);
206       }
207 
208       try {
209         Bootstrap b = new Bootstrap();
210         b.group(group)
211             .channel(NioDatagramChannel.class)
212             .option(ChannelOption.SO_REUSEADDR, true)
213             .handler(new ClusterStatusHandler());
214 
215         channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
216       } catch (InterruptedException e) {
217         close();
218         throw ExceptionUtil.asInterrupt(e);
219       }
220 
221       NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
222       channel.joinGroup(ina, ni, null, channel.newPromise());
223     }
224 
225     @Override
226     public void close() {
227       if (channel != null) {
228         channel.close();
229         channel = null;
230       }
231       group.shutdownGracefully();
232     }
233 
234 
235 
236     /**
237      * Class, conforming to the Netty framework, that manages the message received.
238      */
239     private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
240 
241       @Override
242       public void exceptionCaught(
243           ChannelHandlerContext ctx, Throwable cause)
244           throws Exception {
245         LOG.error("Unexpected exception, continuing.", cause);
246       }
247 
248       @Override
249       public boolean acceptInboundMessage(Object msg)
250           throws Exception {
251         return super.acceptInboundMessage(msg);
252       }
253 
254 
255       @Override
256       protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
257         ByteBufInputStream bis = new ByteBufInputStream(dp.content());
258         try {
259           ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
260           ClusterStatus ncs = ClusterStatus.convert(csp);
261           receive(ncs);
262         } finally {
263           bis.close();
264         }
265       }
266     }
267   }
268 }