001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.net.InetAddress;
027import java.net.NetworkInterface;
028import java.net.UnknownHostException;
029import java.util.ArrayList;
030import java.util.List;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.ClusterMetricsBuilder;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.util.Addressing;
038import org.apache.hadoop.hbase.util.ExceptionUtil;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
046import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
047import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
048import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
049import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
050import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
051import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
052import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
053import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
056
057/**
058 * A class that receives the cluster status, and provide it as a set of service to the client.
059 * Today, manages only the dead server list.
060 * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
061 */
062@InterfaceAudience.Private
063class ClusterStatusListener implements Closeable {
064  private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class);
065  private final List<ServerName> deadServers = new ArrayList<>();
066  protected final DeadServerHandler deadServerHandler;
067  private final Listener listener;
068
069  /**
070   * The implementation class to use to read the status.
071   */
072  public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
073  public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
074      MulticastListener.class;
075
076  /**
077   * Class to be extended to manage a new dead server.
078   */
079  public interface DeadServerHandler {
080
081    /**
082     * Called when a server is identified as dead. Called only once even if we receive the
083     * information multiple times.
084     *
085     * @param sn - the server name
086     */
087    void newDead(ServerName sn);
088  }
089
090
091  /**
092   * The interface to be implemented by a listener of a cluster status event.
093   */
094  interface Listener extends Closeable {
095    /**
096     * Called to close the resources, if any. Cannot throw an exception.
097     */
098    @Override
099    void close();
100
101    /**
102     * Called to connect.
103     *
104     * @param conf Configuration to use.
105     * @throws IOException if failing to connect
106     */
107    void connect(Configuration conf) throws IOException;
108  }
109
110  public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
111                               Class<? extends Listener> listenerClass) throws IOException {
112    this.deadServerHandler = dsh;
113    try {
114      Constructor<? extends Listener> ctor =
115          listenerClass.getConstructor(ClusterStatusListener.class);
116      this.listener = ctor.newInstance(this);
117    } catch (InstantiationException e) {
118      throw new IOException("Can't create listener " + listenerClass.getName(), e);
119    } catch (IllegalAccessException e) {
120      throw new IOException("Can't create listener " + listenerClass.getName(), e);
121    } catch (NoSuchMethodException e) {
122      throw new IllegalStateException();
123    } catch (InvocationTargetException e) {
124      throw new IllegalStateException();
125    }
126
127    this.listener.connect(conf);
128  }
129
130  /**
131   * Acts upon the reception of a new cluster status.
132   *
133   * @param ncs the cluster status
134   */
135  public void receive(ClusterMetrics ncs) {
136    if (ncs.getDeadServerNames() != null) {
137      for (ServerName sn : ncs.getDeadServerNames()) {
138        if (!isDeadServer(sn)) {
139          LOG.info("There is a new dead server: " + sn);
140          deadServers.add(sn);
141          if (deadServerHandler != null) {
142            deadServerHandler.newDead(sn);
143          }
144        }
145      }
146    }
147  }
148
149  @Override
150  public void close() {
151    listener.close();
152  }
153
154  /**
155   * Check if we know if a server is dead.
156   *
157   * @param sn the server name to check.
158   * @return true if we know for sure that the server is dead, false otherwise.
159   */
160  public boolean isDeadServer(ServerName sn) {
161    if (sn.getStartcode() <= 0) {
162      return false;
163    }
164
165    for (ServerName dead : deadServers) {
166      if (dead.getStartcode() >= sn.getStartcode() &&
167          dead.getPort() == sn.getPort() &&
168          dead.getHostname().equals(sn.getHostname())) {
169        return true;
170      }
171    }
172
173    return false;
174  }
175
176
177  /**
178   * An implementation using a multicast message between the master & the client.
179   */
180  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
181  class MulticastListener implements Listener {
182    private DatagramChannel channel;
183    private final EventLoopGroup group = new NioEventLoopGroup(
184        1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
185
186    public MulticastListener() {
187    }
188
189    @Override
190    public void connect(Configuration conf) throws IOException {
191
192      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
193          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
194      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
195        HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
196      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
197          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
198      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
199
200      InetAddress ina;
201      try {
202        ina = InetAddress.getByName(mcAddress);
203      } catch (UnknownHostException e) {
204        close();
205        throw new IOException("Can't connect to " + mcAddress, e);
206      }
207
208      try {
209        Bootstrap b = new Bootstrap();
210        b.group(group)
211            .channel(NioDatagramChannel.class)
212            .option(ChannelOption.SO_REUSEADDR, true)
213            .handler(new ClusterStatusHandler());
214
215        channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
216      } catch (InterruptedException e) {
217        close();
218        throw ExceptionUtil.asInterrupt(e);
219      }
220
221      NetworkInterface ni;
222      if (niName != null) {
223        ni = NetworkInterface.getByName(niName);
224      } else {
225        ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
226      }
227
228      channel.joinGroup(ina, ni, null, channel.newPromise());
229    }
230
231    @Override
232    public void close() {
233      if (channel != null) {
234        channel.close();
235        channel = null;
236      }
237      group.shutdownGracefully();
238    }
239
240
241
242    /**
243     * Class, conforming to the Netty framework, that manages the message received.
244     */
245    private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
246
247      @Override
248      public void exceptionCaught(
249          ChannelHandlerContext ctx, Throwable cause)
250          throws Exception {
251        LOG.error("Unexpected exception, continuing.", cause);
252      }
253
254      @Override
255      public boolean acceptInboundMessage(Object msg)
256          throws Exception {
257        return super.acceptInboundMessage(msg);
258      }
259
260
261      @Override
262      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
263        ByteBufInputStream bis = new ByteBufInputStream(dp.content());
264        try {
265          ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
266          ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp);
267          receive(ncs);
268        } finally {
269          bis.close();
270        }
271      }
272    }
273  }
274}