001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
021import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.lang.reflect.Constructor;
026import java.lang.reflect.InvocationTargetException;
027import java.net.InetAddress;
028import java.net.NetworkInterface;
029import java.net.UnknownHostException;
030import java.util.ArrayList;
031import java.util.List;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.ClusterMetricsBuilder;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.util.Addressing;
039import org.apache.hadoop.hbase.util.ExceptionUtil;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
047import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
048import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
049import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
050import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
051import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
052import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
053import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
054import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
055import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
058
059/**
060 * A class that receives the cluster status, and provide it as a set of service to the client.
061 * Today, manages only the dead server list. The class is abstract to allow multiple
062 * implementations, from ZooKeeper to multicast based.
063 */
064@InterfaceAudience.Private
065class ClusterStatusListener implements Closeable {
066  private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class);
067  private final List<ServerName> deadServers = new ArrayList<>();
068  protected final DeadServerHandler deadServerHandler;
069  private final Listener listener;
070
071  /**
072   * The implementation class to use to read the status.
073   */
074  public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
075  public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
076    MulticastListener.class;
077
078  /**
079   * Class to be extended to manage a new dead server.
080   */
081  public interface DeadServerHandler {
082
083    /**
084     * Called when a server is identified as dead. Called only once even if we receive the
085     * information multiple times.
086     * @param sn - the server name
087     */
088    void newDead(ServerName sn);
089  }
090
091  /**
092   * The interface to be implemented by a listener of a cluster status event.
093   */
094  interface Listener extends Closeable {
095    /**
096     * Called to close the resources, if any. Cannot throw an exception.
097     */
098    @Override
099    void close();
100
101    /**
102     * Called to connect.
103     * @param conf Configuration to use.
104     * @throws IOException if failing to connect
105     */
106    void connect(Configuration conf) throws IOException;
107  }
108
109  public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
110    Class<? extends Listener> listenerClass) throws IOException {
111    this.deadServerHandler = dsh;
112    try {
113      Constructor<? extends Listener> ctor =
114        listenerClass.getConstructor(ClusterStatusListener.class);
115      this.listener = ctor.newInstance(this);
116    } catch (InstantiationException e) {
117      throw new IOException("Can't create listener " + listenerClass.getName(), e);
118    } catch (IllegalAccessException e) {
119      throw new IOException("Can't create listener " + listenerClass.getName(), e);
120    } catch (NoSuchMethodException e) {
121      throw new IllegalStateException();
122    } catch (InvocationTargetException e) {
123      throw new IllegalStateException();
124    }
125
126    this.listener.connect(conf);
127  }
128
129  /**
130   * Acts upon the reception of a new cluster status.
131   * @param ncs the cluster status
132   */
133  public void receive(ClusterMetrics ncs) {
134    if (ncs.getDeadServerNames() != null) {
135      for (ServerName sn : ncs.getDeadServerNames()) {
136        if (!isDeadServer(sn)) {
137          LOG.info("There is a new dead server: " + sn);
138          deadServers.add(sn);
139          if (deadServerHandler != null) {
140            deadServerHandler.newDead(sn);
141          }
142        }
143      }
144    }
145  }
146
147  @Override
148  public void close() {
149    listener.close();
150  }
151
152  /**
153   * Check if we know if a server is dead.
154   * @param sn the server name to check.
155   * @return true if we know for sure that the server is dead, false otherwise.
156   */
157  public boolean isDeadServer(ServerName sn) {
158    if (sn.getStartcode() <= 0) {
159      return false;
160    }
161
162    for (ServerName dead : deadServers) {
163      if (
164        dead.getStartcode() >= sn.getStartcode() && dead.getPort() == sn.getPort()
165          && dead.getHostname().equals(sn.getHostname())
166      ) {
167        return true;
168      }
169    }
170
171    return false;
172  }
173
174  /**
175   * An implementation using a multicast message between the master & the client.
176   */
177  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
178  class MulticastListener implements Listener {
179    private DatagramChannel channel;
180    private final EventLoopGroup group = new NioEventLoopGroup(1,
181      new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
182        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
183
184    public MulticastListener() {
185    }
186
187    @Override
188    public void connect(Configuration conf) throws IOException {
189      String mcAddress =
190        conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
191      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
192        HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
193      int port =
194        conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT);
195      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
196
197      InetAddress ina;
198      try {
199        ina = InetAddress.getByName(mcAddress);
200      } catch (UnknownHostException e) {
201        close();
202        throw new IOException("Can't connect to " + mcAddress, e);
203      }
204
205      try {
206        Bootstrap b = new Bootstrap();
207        b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_REUSEADDR, true)
208          .handler(new ClusterStatusHandler());
209        channel = (DatagramChannel) b.bind(bindAddress, port).sync().channel();
210      } catch (InterruptedException e) {
211        close();
212        throw ExceptionUtil.asInterrupt(e);
213      }
214
215      NetworkInterface ni;
216      if (niName != null) {
217        ni = NetworkInterface.getByName(niName);
218      } else {
219        ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
220      }
221
222      LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
223      try {
224        consume(channel.joinGroup(ina, ni, null).sync());
225      } catch (InterruptedException e) {
226        close();
227        throw ExceptionUtil.asInterrupt(e);
228      }
229    }
230
231    @Override
232    public void close() {
233      if (channel != null) {
234        safeClose(channel);
235        channel = null;
236      }
237      consume(group.shutdownGracefully());
238    }
239
240    /**
241     * Class, conforming to the Netty framework, that manages the message received.
242     */
243    private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
244
245      @Override
246      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
247        LOG.error("Unexpected exception, continuing.", cause);
248      }
249
250      @Override
251      public boolean acceptInboundMessage(Object msg) throws Exception {
252        return super.acceptInboundMessage(msg);
253      }
254
255      @Override
256      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
257        ByteBufInputStream bis = new ByteBufInputStream(dp.content());
258        try {
259          ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
260          ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp);
261          receive(ncs);
262        } finally {
263          bis.close();
264        }
265      }
266    }
267  }
268}