001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.net.InetAddress;
027import java.net.NetworkInterface;
028import java.net.UnknownHostException;
029import java.util.ArrayList;
030import java.util.List;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.ClusterMetricsBuilder;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.util.Addressing;
038import org.apache.hadoop.hbase.util.ExceptionUtil;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
041import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
042import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
043import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
045import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
046import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
047import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
048import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
049import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A class that receives the cluster status, and provide it as a set of service to the client.
057 * Today, manages only the dead server list.
058 * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
059 */
060@InterfaceAudience.Private
061class ClusterStatusListener implements Closeable {
062  private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class);
063  private final List<ServerName> deadServers = new ArrayList<>();
064  protected final DeadServerHandler deadServerHandler;
065  private final Listener listener;
066
067  /**
068   * The implementation class to use to read the status.
069   */
070  public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
071  public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
072      MulticastListener.class;
073
074  /**
075   * Class to be extended to manage a new dead server.
076   */
077  public interface DeadServerHandler {
078
079    /**
080     * Called when a server is identified as dead. Called only once even if we receive the
081     * information multiple times.
082     *
083     * @param sn - the server name
084     */
085    void newDead(ServerName sn);
086  }
087
088
089  /**
090   * The interface to be implemented by a listener of a cluster status event.
091   */
092  interface Listener extends Closeable {
093    /**
094     * Called to close the resources, if any. Cannot throw an exception.
095     */
096    @Override
097    void close();
098
099    /**
100     * Called to connect.
101     *
102     * @param conf Configuration to use.
103     * @throws IOException if failing to connect
104     */
105    void connect(Configuration conf) throws IOException;
106  }
107
108  public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
109                               Class<? extends Listener> listenerClass) throws IOException {
110    this.deadServerHandler = dsh;
111    try {
112      Constructor<? extends Listener> ctor =
113          listenerClass.getConstructor(ClusterStatusListener.class);
114      this.listener = ctor.newInstance(this);
115    } catch (InstantiationException e) {
116      throw new IOException("Can't create listener " + listenerClass.getName(), e);
117    } catch (IllegalAccessException e) {
118      throw new IOException("Can't create listener " + listenerClass.getName(), e);
119    } catch (NoSuchMethodException e) {
120      throw new IllegalStateException();
121    } catch (InvocationTargetException e) {
122      throw new IllegalStateException();
123    }
124
125    this.listener.connect(conf);
126  }
127
128  /**
129   * Acts upon the reception of a new cluster status.
130   *
131   * @param ncs the cluster status
132   */
133  public void receive(ClusterMetrics ncs) {
134    if (ncs.getDeadServerNames() != null) {
135      for (ServerName sn : ncs.getDeadServerNames()) {
136        if (!isDeadServer(sn)) {
137          LOG.info("There is a new dead server: " + sn);
138          deadServers.add(sn);
139          if (deadServerHandler != null) {
140            deadServerHandler.newDead(sn);
141          }
142        }
143      }
144    }
145  }
146
147  @Override
148  public void close() {
149    listener.close();
150  }
151
152  /**
153   * Check if we know if a server is dead.
154   *
155   * @param sn the server name to check.
156   * @return true if we know for sure that the server is dead, false otherwise.
157   */
158  public boolean isDeadServer(ServerName sn) {
159    if (sn.getStartcode() <= 0) {
160      return false;
161    }
162
163    for (ServerName dead : deadServers) {
164      if (dead.getStartcode() >= sn.getStartcode() &&
165          dead.getPort() == sn.getPort() &&
166          dead.getHostname().equals(sn.getHostname())) {
167        return true;
168      }
169    }
170
171    return false;
172  }
173
174
175  /**
176   * An implementation using a multicast message between the master & the client.
177   */
178  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
179  class MulticastListener implements Listener {
180    private DatagramChannel channel;
181    private final EventLoopGroup group = new NioEventLoopGroup(
182        1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
183
184    public MulticastListener() {
185    }
186
187    @Override
188    public void connect(Configuration conf) throws IOException {
189
190      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
191          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
192      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
193        HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
194      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
195          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
196      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
197
198      InetAddress ina;
199      try {
200        ina = InetAddress.getByName(mcAddress);
201      } catch (UnknownHostException e) {
202        close();
203        throw new IOException("Can't connect to " + mcAddress, e);
204      }
205
206      try {
207        Bootstrap b = new Bootstrap();
208        b.group(group)
209          .channel(NioDatagramChannel.class)
210          .option(ChannelOption.SO_REUSEADDR, true)
211          .handler(new ClusterStatusHandler());
212        channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
213      } catch (InterruptedException e) {
214        close();
215        throw ExceptionUtil.asInterrupt(e);
216      }
217
218      NetworkInterface ni;
219      if (niName != null) {
220        ni = NetworkInterface.getByName(niName);
221      } else {
222        ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
223      }
224
225      LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
226      channel.joinGroup(ina, ni, null, channel.newPromise());
227    }
228
229
230    @Override
231    public void close() {
232      if (channel != null) {
233        channel.close();
234        channel = null;
235      }
236      group.shutdownGracefully();
237    }
238
239
240
241    /**
242     * Class, conforming to the Netty framework, that manages the message received.
243     */
244    private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
245
246      @Override
247      public void exceptionCaught(
248          ChannelHandlerContext ctx, Throwable cause)
249          throws Exception {
250        LOG.error("Unexpected exception, continuing.", cause);
251      }
252
253      @Override
254      public boolean acceptInboundMessage(Object msg) throws Exception {
255        return super.acceptInboundMessage(msg);
256      }
257
258
259      @Override
260      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
261        ByteBufInputStream bis = new ByteBufInputStream(dp.content());
262        try {
263          ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
264          ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp);
265          receive(ncs);
266        } finally {
267          bis.close();
268        }
269      }
270    }
271  }
272}