001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.net.InetAddress;
027import java.net.NetworkInterface;
028import java.net.UnknownHostException;
029import java.util.ArrayList;
030import java.util.List;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.ClusterMetricsBuilder;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.util.Addressing;
038import org.apache.hadoop.hbase.util.ExceptionUtil;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
042import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
043import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
044import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
046import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
047import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
048import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
049import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
050import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * A class that receives the cluster status, and provide it as a set of service to the client.
058 * Today, manages only the dead server list.
059 * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
060 */
061@InterfaceAudience.Private
062class ClusterStatusListener implements Closeable {
063  private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class);
064  private final List<ServerName> deadServers = new ArrayList<>();
065  protected final DeadServerHandler deadServerHandler;
066  private final Listener listener;
067
068  /**
069   * The implementation class to use to read the status.
070   */
071  public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
072  public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
073      MulticastListener.class;
074
075  /**
076   * Class to be extended to manage a new dead server.
077   */
078  public interface DeadServerHandler {
079
080    /**
081     * Called when a server is identified as dead. Called only once even if we receive the
082     * information multiple times.
083     *
084     * @param sn - the server name
085     */
086    void newDead(ServerName sn);
087  }
088
089
090  /**
091   * The interface to be implemented by a listener of a cluster status event.
092   */
093  interface Listener extends Closeable {
094    /**
095     * Called to close the resources, if any. Cannot throw an exception.
096     */
097    @Override
098    void close();
099
100    /**
101     * Called to connect.
102     *
103     * @param conf Configuration to use.
104     * @throws IOException if failing to connect
105     */
106    void connect(Configuration conf) throws IOException;
107  }
108
109  public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
110                               Class<? extends Listener> listenerClass) throws IOException {
111    this.deadServerHandler = dsh;
112    try {
113      Constructor<? extends Listener> ctor =
114          listenerClass.getConstructor(ClusterStatusListener.class);
115      this.listener = ctor.newInstance(this);
116    } catch (InstantiationException e) {
117      throw new IOException("Can't create listener " + listenerClass.getName(), e);
118    } catch (IllegalAccessException e) {
119      throw new IOException("Can't create listener " + listenerClass.getName(), e);
120    } catch (NoSuchMethodException e) {
121      throw new IllegalStateException();
122    } catch (InvocationTargetException e) {
123      throw new IllegalStateException();
124    }
125
126    this.listener.connect(conf);
127  }
128
129  /**
130   * Acts upon the reception of a new cluster status.
131   *
132   * @param ncs the cluster status
133   */
134  public void receive(ClusterMetrics ncs) {
135    if (ncs.getDeadServerNames() != null) {
136      for (ServerName sn : ncs.getDeadServerNames()) {
137        if (!isDeadServer(sn)) {
138          LOG.info("There is a new dead server: " + sn);
139          deadServers.add(sn);
140          if (deadServerHandler != null) {
141            deadServerHandler.newDead(sn);
142          }
143        }
144      }
145    }
146  }
147
148  @Override
149  public void close() {
150    listener.close();
151  }
152
153  /**
154   * Check if we know if a server is dead.
155   *
156   * @param sn the server name to check.
157   * @return true if we know for sure that the server is dead, false otherwise.
158   */
159  public boolean isDeadServer(ServerName sn) {
160    if (sn.getStartcode() <= 0) {
161      return false;
162    }
163
164    for (ServerName dead : deadServers) {
165      if (dead.getStartcode() >= sn.getStartcode() &&
166          dead.getPort() == sn.getPort() &&
167          dead.getHostname().equals(sn.getHostname())) {
168        return true;
169      }
170    }
171
172    return false;
173  }
174
175
176  /**
177   * An implementation using a multicast message between the master & the client.
178   */
179  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
180  class MulticastListener implements Listener {
181    private DatagramChannel channel;
182    private final EventLoopGroup group = new NioEventLoopGroup(1,
183      new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
184        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
185
186    public MulticastListener() {
187    }
188
189    @Override
190    public void connect(Configuration conf) throws IOException {
191
192      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
193          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
194      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
195        HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
196      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
197          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
198      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
199
200      InetAddress ina;
201      try {
202        ina = InetAddress.getByName(mcAddress);
203      } catch (UnknownHostException e) {
204        close();
205        throw new IOException("Can't connect to " + mcAddress, e);
206      }
207
208      try {
209        Bootstrap b = new Bootstrap();
210        b.group(group)
211          .channel(NioDatagramChannel.class)
212          .option(ChannelOption.SO_REUSEADDR, true)
213          .handler(new ClusterStatusHandler());
214        channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
215      } catch (InterruptedException e) {
216        close();
217        throw ExceptionUtil.asInterrupt(e);
218      }
219
220      NetworkInterface ni;
221      if (niName != null) {
222        ni = NetworkInterface.getByName(niName);
223      } else {
224        ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
225      }
226
227      LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
228      channel.joinGroup(ina, ni, null, channel.newPromise());
229    }
230
231
232    @Override
233    public void close() {
234      if (channel != null) {
235        channel.close();
236        channel = null;
237      }
238      group.shutdownGracefully();
239    }
240
241
242
243    /**
244     * Class, conforming to the Netty framework, that manages the message received.
245     */
246    private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
247
248      @Override
249      public void exceptionCaught(
250          ChannelHandlerContext ctx, Throwable cause)
251          throws Exception {
252        LOG.error("Unexpected exception, continuing.", cause);
253      }
254
255      @Override
256      public boolean acceptInboundMessage(Object msg) throws Exception {
257        return super.acceptInboundMessage(msg);
258      }
259
260
261      @Override
262      protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
263        ByteBufInputStream bis = new ByteBufInputStream(dp.content());
264        try {
265          ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
266          ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp);
267          receive(ncs);
268        } finally {
269          bis.close();
270        }
271      }
272    }
273  }
274}