001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.InvocationTargetException;
024import java.net.InetAddress;
025import java.net.NetworkInterface;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterMetrics;
031import org.apache.hadoop.hbase.ClusterMetricsBuilder;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.util.Addressing;
036import org.apache.hadoop.hbase.util.ExceptionUtil;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
044import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
045import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
046import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
047import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
048import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
049import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
050import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
051import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
052import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
055
056/**
057 * A class that receives the cluster status, and provide it as a set of service to the client.
058 * Today, manages only the dead server list. The class is abstract to allow multiple
059 * implementations, from ZooKeeper to multicast based.
060 */
061@InterfaceAudience.Private
062class ClusterStatusListener implements Closeable {
063  private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class);
064  private final List<ServerName> deadServers = new ArrayList<>();
065  protected final DeadServerHandler deadServerHandler;
066  private final Listener listener;
067
068  /**
069   * The implementation class to use to read the status.
070   */
071  public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
072  public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
073    MulticastListener.class;
074
075  /**
076   * Class to be extended to manage a new dead server.
077   */
078  public interface DeadServerHandler {
079
080    /**
081     * Called when a server is identified as dead. Called only once even if we receive the
082     * information multiple times.
083     * @param sn - the server name
084     */
085    void newDead(ServerName sn);
086  }
087
088  /**
089   * The interface to be implemented by a listener of a cluster status event.
090   */
091  interface Listener extends Closeable {
092    /**
093     * Called to close the resources, if any. Cannot throw an exception.
094     */
095    @Override
096    void close();
097
098    /**
099     * Called to connect.
100     * @param conf Configuration to use.
101     * @throws IOException if failing to connect
102     */
103    void connect(Configuration conf) throws IOException;
104  }
105
106  public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
107    Class<? extends Listener> listenerClass) throws IOException {
108    this.deadServerHandler = dsh;
109    try {
110      Constructor<? extends Listener> ctor =
111        listenerClass.getConstructor(ClusterStatusListener.class);
112      this.listener = ctor.newInstance(this);
113    } catch (InstantiationException e) {
114      throw new IOException("Can't create listener " + listenerClass.getName(), e);
115    } catch (IllegalAccessException e) {
116      throw new IOException("Can't create listener " + listenerClass.getName(), e);
117    } catch (NoSuchMethodException e) {
118      throw new IllegalStateException();
119    } catch (InvocationTargetException e) {
120      throw new IllegalStateException();
121    }
122
123    this.listener.connect(conf);
124  }
125
126  /**
127   * Acts upon the reception of a new cluster status.
128   * @param ncs the cluster status
129   */
130  public void receive(ClusterMetrics ncs) {
131    if (ncs.getDeadServerNames() != null) {
132      for (ServerName sn : ncs.getDeadServerNames()) {
133        if (!isDeadServer(sn)) {
134          LOG.info("There is a new dead server: " + sn);
135          deadServers.add(sn);
136          if (deadServerHandler != null) {
137            deadServerHandler.newDead(sn);
138          }
139        }
140      }
141    }
142  }
143
144  @Override
145  public void close() {
146    listener.close();
147  }
148
149  /**
150   * Check if we know if a server is dead.
151   * @param sn the server name to check.
152   * @return true if we know for sure that the server is dead, false otherwise.
153   */
154  public boolean isDeadServer(ServerName sn) {
155    if (sn.getStartcode() <= 0) {
156      return false;
157    }
158
159    for (ServerName dead : deadServers) {
160      if (
161        dead.getStartcode() >= sn.getStartcode() && dead.getPort() == sn.getPort()
162          && dead.getHostname().equals(sn.getHostname())
163      ) {
164        return true;
165      }
166    }
167
168    return false;
169  }
170
171  /**
172   * An implementation using a multicast message between the master & the client.
173   */
174  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
175  class MulticastListener implements Listener {
176    private DatagramChannel channel;
177    private final EventLoopGroup group = new NioEventLoopGroup(1,
178      new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
179        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
180
181    public MulticastListener() {
182    }
183
184    @Override
185    public void connect(Configuration conf) throws IOException {
186
187      String mcAddress =
188        conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
189      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
190        HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
191      int port =
192        conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT);
193      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
194
195      InetAddress ina;
196      try {
197        ina = InetAddress.getByName(mcAddress);
198      } catch (UnknownHostException e) {
199        close();
200        throw new IOException("Can't connect to " + mcAddress, e);
201      }
202
203      try {
204        Bootstrap b = new Bootstrap();
205        b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_REUSEADDR, true)
206          .handler(new ClusterStatusHandler());
207        channel = (DatagramChannel) b.bind(bindAddress, port).sync().channel();
208      } catch (InterruptedException e) {
209        close();
210        throw ExceptionUtil.asInterrupt(e);
211      }
212
213      NetworkInterface ni;
214      if (niName != null) {
215        ni = NetworkInterface.getByName(niName);
216      } else {
217        ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
218      }
219
220      LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
221      channel.joinGroup(ina, ni, null, channel.newPromise());
222    }
223
224    @Override
225    public void close() {
226      if (channel != null) {
227        channel.close();
228        channel = null;
229      }
230      group.shutdownGracefully();
231    }
232
233    /**
234     * Class, conforming to the Netty framework, that manages the message received.
235     */
236    private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
237
238      @Override
239      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
240        LOG.error("Unexpected exception, continuing.", cause);
241      }
242
243      @Override
244      public boolean acceptInboundMessage(Object msg) throws Exception {
245        return super.acceptInboundMessage(msg);
246      }
247
248      @Override
249      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
250        ByteBufInputStream bis = new ByteBufInputStream(dp.content());
251        try {
252          ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
253          ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp);
254          receive(ncs);
255        } finally {
256          bis.close();
257        }
258      }
259    }
260  }
261}