View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import io.netty.bootstrap.Bootstrap;
24  import io.netty.buffer.ByteBufInputStream;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.nio.NioEventLoopGroup;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.nio.NioDatagramChannel;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.ClusterStatus;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
41  import org.apache.hadoop.hbase.util.Addressing;
42  import org.apache.hadoop.hbase.util.ExceptionUtil;
43  import org.apache.hadoop.hbase.util.Threads;
44  
45  import java.io.Closeable;
46  import java.io.IOException;
47  import java.lang.reflect.Constructor;
48  import java.lang.reflect.InvocationTargetException;
49  import java.net.InetAddress;
50  import java.net.NetworkInterface;
51  import java.net.UnknownHostException;
52  import java.util.ArrayList;
53  import java.util.List;
54  
55  
56  /**
57   * A class that receives the cluster status, and provide it as a set of service to the client.
58   * Today, manages only the dead server list.
59   * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
60   */
61  @InterfaceAudience.Private
62  class ClusterStatusListener implements Closeable {
63    private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
64    private final List<ServerName> deadServers = new ArrayList<ServerName>();
65    protected final DeadServerHandler deadServerHandler;
66    private final Listener listener;
67  
68    /**
69     * The implementation class to use to read the status.
70     */
71    public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
72    public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
73        MulticastListener.class;
74  
75    /**
76     * Class to be extended to manage a new dead server.
77     */
78    public interface DeadServerHandler {
79  
80      /**
81       * Called when a server is identified as dead. Called only once even if we receive the
82       * information multiple times.
83       *
84       * @param sn - the server name
85       */
86      void newDead(ServerName sn);
87    }
88  
89  
90    /**
91     * The interface to be implemented by a listener of a cluster status event.
92     */
93    interface Listener extends Closeable {
94      /**
95       * Called to close the resources, if any. Cannot throw an exception.
96       */
97      @Override
98      void close();
99  
100     /**
101      * Called to connect.
102      *
103      * @param conf Configuration to use.
104      * @throws IOException
105      */
106     void connect(Configuration conf) throws IOException;
107   }
108 
109   public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
110                                Class<? extends Listener> listenerClass) throws IOException {
111     this.deadServerHandler = dsh;
112     try {
113       Constructor<? extends Listener> ctor =
114           listenerClass.getConstructor(ClusterStatusListener.class);
115       this.listener = ctor.newInstance(this);
116     } catch (InstantiationException e) {
117       throw new IOException("Can't create listener " + listenerClass.getName(), e);
118     } catch (IllegalAccessException e) {
119       throw new IOException("Can't create listener " + listenerClass.getName(), e);
120     } catch (NoSuchMethodException e) {
121       throw new IllegalStateException();
122     } catch (InvocationTargetException e) {
123       throw new IllegalStateException();
124     }
125 
126     this.listener.connect(conf);
127   }
128 
129   /**
130    * Acts upon the reception of a new cluster status.
131    *
132    * @param ncs the cluster status
133    */
134   public void receive(ClusterStatus ncs) {
135     if (ncs.getDeadServerNames() != null) {
136       for (ServerName sn : ncs.getDeadServerNames()) {
137         if (!isDeadServer(sn)) {
138           LOG.info("There is a new dead server: " + sn);
139           deadServers.add(sn);
140           if (deadServerHandler != null) {
141             deadServerHandler.newDead(sn);
142           }
143         }
144       }
145     }
146   }
147 
148   @Override
149   public void close() {
150     listener.close();
151   }
152 
153   /**
154    * Check if we know if a server is dead.
155    *
156    * @param sn the server name to check.
157    * @return true if we know for sure that the server is dead, false otherwise.
158    */
159   public boolean isDeadServer(ServerName sn) {
160     if (sn.getStartcode() <= 0) {
161       return false;
162     }
163 
164     for (ServerName dead : deadServers) {
165       if (dead.getStartcode() >= sn.getStartcode() &&
166           dead.getPort() == sn.getPort() &&
167           dead.getHostname().equals(sn.getHostname())) {
168         return true;
169       }
170     }
171 
172     return false;
173   }
174 
175 
176   /**
177    * An implementation using a multicast message between the master & the client.
178    */
179   class MulticastListener implements Listener {
180     private DatagramChannel channel;
181     private final EventLoopGroup group = new NioEventLoopGroup(
182         1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
183 
184     public MulticastListener() {
185     }
186 
187     @Override
188     public void connect(Configuration conf) throws IOException {
189 
190       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
191           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
192       String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
193         HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
194       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
195           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
196 
197       InetAddress ina;
198       try {
199         ina = InetAddress.getByName(mcAddress);
200       } catch (UnknownHostException e) {
201         close();
202         throw new IOException("Can't connect to " + mcAddress, e);
203       }
204 
205       try {
206         Bootstrap b = new Bootstrap();
207         b.group(group)
208             .channel(NioDatagramChannel.class)
209             .option(ChannelOption.SO_REUSEADDR, true)
210             .handler(new ClusterStatusHandler());
211 
212         channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
213       } catch (InterruptedException e) {
214         close();
215         throw ExceptionUtil.asInterrupt(e);
216       }
217 
218       NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
219       channel.joinGroup(ina, ni, null, channel.newPromise());
220     }
221 
222     @Override
223     public void close() {
224       if (channel != null) {
225         channel.close();
226         channel = null;
227       }
228       group.shutdownGracefully();
229     }
230 
231 
232 
233     /**
234      * Class, conforming to the Netty framework, that manages the message received.
235      */
236     private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
237 
238       @Override
239       public void exceptionCaught(
240           ChannelHandlerContext ctx, Throwable cause)
241           throws Exception {
242         LOG.error("Unexpected exception, continuing.", cause);
243       }
244 
245       @Override
246       public boolean acceptInboundMessage(Object msg)
247           throws Exception {
248         return super.acceptInboundMessage(msg);
249       }
250 
251 
252       @Override
253       protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
254         ByteBufInputStream bis = new ByteBufInputStream(dp.content());
255         try {
256           ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
257           ClusterStatus ncs = ClusterStatus.convert(csp);
258           receive(ncs);
259         } finally {
260           bis.close();
261         }
262       }
263     }
264   }
265 }