001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; 021import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.InvocationTargetException; 027import java.net.InetAddress; 028import java.net.NetworkInterface; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.List; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.ClusterMetricsBuilder; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.util.Addressing; 039import org.apache.hadoop.hbase.util.ExceptionUtil; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 047import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 048import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 049import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 050import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 051import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 052import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 053import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 054import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 055import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 058 059/** 060 * A class that receives the cluster status, and provide it as a set of service to the client. 061 * Today, manages only the dead server list. The class is abstract to allow multiple 062 * implementations, from ZooKeeper to multicast based. 063 */ 064@InterfaceAudience.Private 065class ClusterStatusListener implements Closeable { 066 private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class); 067 private final List<ServerName> deadServers = new ArrayList<>(); 068 protected final DeadServerHandler deadServerHandler; 069 private final Listener listener; 070 071 /** 072 * The implementation class to use to read the status. 073 */ 074 public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class"; 075 public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS = 076 MulticastListener.class; 077 078 /** 079 * Class to be extended to manage a new dead server. 080 */ 081 public interface DeadServerHandler { 082 083 /** 084 * Called when a server is identified as dead. Called only once even if we receive the 085 * information multiple times. 086 * @param sn - the server name 087 */ 088 void newDead(ServerName sn); 089 } 090 091 /** 092 * The interface to be implemented by a listener of a cluster status event. 093 */ 094 interface Listener extends Closeable { 095 /** 096 * Called to close the resources, if any. Cannot throw an exception. 097 */ 098 @Override 099 void close(); 100 101 /** 102 * Called to connect. 103 * @param conf Configuration to use. 104 * @throws IOException if failing to connect 105 */ 106 void connect(Configuration conf) throws IOException; 107 } 108 109 public ClusterStatusListener(DeadServerHandler dsh, Configuration conf, 110 Class<? extends Listener> listenerClass) throws IOException { 111 this.deadServerHandler = dsh; 112 try { 113 Constructor<? extends Listener> ctor = 114 listenerClass.getConstructor(ClusterStatusListener.class); 115 this.listener = ctor.newInstance(this); 116 } catch (InstantiationException e) { 117 throw new IOException("Can't create listener " + listenerClass.getName(), e); 118 } catch (IllegalAccessException e) { 119 throw new IOException("Can't create listener " + listenerClass.getName(), e); 120 } catch (NoSuchMethodException e) { 121 throw new IllegalStateException(); 122 } catch (InvocationTargetException e) { 123 throw new IllegalStateException(); 124 } 125 126 this.listener.connect(conf); 127 } 128 129 /** 130 * Acts upon the reception of a new cluster status. 131 * @param ncs the cluster status 132 */ 133 public void receive(ClusterMetrics ncs) { 134 if (ncs.getDeadServerNames() != null) { 135 for (ServerName sn : ncs.getDeadServerNames()) { 136 if (!isDeadServer(sn)) { 137 LOG.info("There is a new dead server: " + sn); 138 deadServers.add(sn); 139 if (deadServerHandler != null) { 140 deadServerHandler.newDead(sn); 141 } 142 } 143 } 144 } 145 } 146 147 @Override 148 public void close() { 149 listener.close(); 150 } 151 152 /** 153 * Check if we know if a server is dead. 154 * @param sn the server name to check. 155 * @return true if we know for sure that the server is dead, false otherwise. 156 */ 157 public boolean isDeadServer(ServerName sn) { 158 if (sn.getStartcode() <= 0) { 159 return false; 160 } 161 162 for (ServerName dead : deadServers) { 163 if ( 164 dead.getStartcode() >= sn.getStartcode() && dead.getPort() == sn.getPort() 165 && dead.getHostname().equals(sn.getHostname()) 166 ) { 167 return true; 168 } 169 } 170 171 return false; 172 } 173 174 /** 175 * An implementation using a multicast message between the master & the client. 176 */ 177 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 178 class MulticastListener implements Listener { 179 private DatagramChannel channel; 180 private final EventLoopGroup group = new NioEventLoopGroup(1, 181 new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d") 182 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 183 184 public MulticastListener() { 185 } 186 187 @Override 188 public void connect(Configuration conf) throws IOException { 189 String mcAddress = 190 conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 191 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS, 192 HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS); 193 int port = 194 conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); 195 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 196 197 InetAddress ina; 198 try { 199 ina = InetAddress.getByName(mcAddress); 200 } catch (UnknownHostException e) { 201 close(); 202 throw new IOException("Can't connect to " + mcAddress, e); 203 } 204 205 try { 206 Bootstrap b = new Bootstrap(); 207 b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_REUSEADDR, true) 208 .handler(new ClusterStatusHandler()); 209 channel = (DatagramChannel) b.bind(bindAddress, port).sync().channel(); 210 } catch (InterruptedException e) { 211 close(); 212 throw ExceptionUtil.asInterrupt(e); 213 } 214 215 NetworkInterface ni; 216 if (niName != null) { 217 ni = NetworkInterface.getByName(niName); 218 } else { 219 ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); 220 } 221 222 LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); 223 try { 224 consume(channel.joinGroup(ina, ni, null).sync()); 225 } catch (InterruptedException e) { 226 close(); 227 throw ExceptionUtil.asInterrupt(e); 228 } 229 } 230 231 @Override 232 public void close() { 233 if (channel != null) { 234 safeClose(channel); 235 channel = null; 236 } 237 consume(group.shutdownGracefully()); 238 } 239 240 /** 241 * Class, conforming to the Netty framework, that manages the message received. 242 */ 243 private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> { 244 245 @Override 246 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 247 LOG.error("Unexpected exception, continuing.", cause); 248 } 249 250 @Override 251 public boolean acceptInboundMessage(Object msg) throws Exception { 252 return super.acceptInboundMessage(msg); 253 } 254 255 @Override 256 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception { 257 ByteBufInputStream bis = new ByteBufInputStream(dp.content()); 258 try { 259 ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis); 260 ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp); 261 receive(ncs); 262 } finally { 263 bis.close(); 264 } 265 } 266 } 267 } 268}