001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import java.io.Closeable; 023import java.io.IOException; 024import java.lang.reflect.Constructor; 025import java.lang.reflect.InvocationTargetException; 026import java.net.InetAddress; 027import java.net.NetworkInterface; 028import java.net.UnknownHostException; 029import java.util.ArrayList; 030import java.util.List; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.ClusterMetricsBuilder; 034import org.apache.hadoop.hbase.HBaseInterfaceAudience; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.util.Addressing; 038import org.apache.hadoop.hbase.util.ExceptionUtil; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 042import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 043import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 044import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 046import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 047import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 049import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 050import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * A class that receives the cluster status, and provide it as a set of service to the client. 058 * Today, manages only the dead server list. 059 * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based. 060 */ 061@InterfaceAudience.Private 062class ClusterStatusListener implements Closeable { 063 private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class); 064 private final List<ServerName> deadServers = new ArrayList<>(); 065 protected final DeadServerHandler deadServerHandler; 066 private final Listener listener; 067 068 /** 069 * The implementation class to use to read the status. 070 */ 071 public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class"; 072 public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS = 073 MulticastListener.class; 074 075 /** 076 * Class to be extended to manage a new dead server. 077 */ 078 public interface DeadServerHandler { 079 080 /** 081 * Called when a server is identified as dead. Called only once even if we receive the 082 * information multiple times. 083 * 084 * @param sn - the server name 085 */ 086 void newDead(ServerName sn); 087 } 088 089 090 /** 091 * The interface to be implemented by a listener of a cluster status event. 092 */ 093 interface Listener extends Closeable { 094 /** 095 * Called to close the resources, if any. Cannot throw an exception. 096 */ 097 @Override 098 void close(); 099 100 /** 101 * Called to connect. 102 * 103 * @param conf Configuration to use. 104 * @throws IOException if failing to connect 105 */ 106 void connect(Configuration conf) throws IOException; 107 } 108 109 public ClusterStatusListener(DeadServerHandler dsh, Configuration conf, 110 Class<? extends Listener> listenerClass) throws IOException { 111 this.deadServerHandler = dsh; 112 try { 113 Constructor<? extends Listener> ctor = 114 listenerClass.getConstructor(ClusterStatusListener.class); 115 this.listener = ctor.newInstance(this); 116 } catch (InstantiationException e) { 117 throw new IOException("Can't create listener " + listenerClass.getName(), e); 118 } catch (IllegalAccessException e) { 119 throw new IOException("Can't create listener " + listenerClass.getName(), e); 120 } catch (NoSuchMethodException e) { 121 throw new IllegalStateException(); 122 } catch (InvocationTargetException e) { 123 throw new IllegalStateException(); 124 } 125 126 this.listener.connect(conf); 127 } 128 129 /** 130 * Acts upon the reception of a new cluster status. 131 * 132 * @param ncs the cluster status 133 */ 134 public void receive(ClusterMetrics ncs) { 135 if (ncs.getDeadServerNames() != null) { 136 for (ServerName sn : ncs.getDeadServerNames()) { 137 if (!isDeadServer(sn)) { 138 LOG.info("There is a new dead server: " + sn); 139 deadServers.add(sn); 140 if (deadServerHandler != null) { 141 deadServerHandler.newDead(sn); 142 } 143 } 144 } 145 } 146 } 147 148 @Override 149 public void close() { 150 listener.close(); 151 } 152 153 /** 154 * Check if we know if a server is dead. 155 * 156 * @param sn the server name to check. 157 * @return true if we know for sure that the server is dead, false otherwise. 158 */ 159 public boolean isDeadServer(ServerName sn) { 160 if (sn.getStartcode() <= 0) { 161 return false; 162 } 163 164 for (ServerName dead : deadServers) { 165 if (dead.getStartcode() >= sn.getStartcode() && 166 dead.getPort() == sn.getPort() && 167 dead.getHostname().equals(sn.getHostname())) { 168 return true; 169 } 170 } 171 172 return false; 173 } 174 175 176 /** 177 * An implementation using a multicast message between the master & the client. 178 */ 179 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 180 class MulticastListener implements Listener { 181 private DatagramChannel channel; 182 private final EventLoopGroup group = new NioEventLoopGroup(1, 183 new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d") 184 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 185 186 public MulticastListener() { 187 } 188 189 @Override 190 public void connect(Configuration conf) throws IOException { 191 192 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, 193 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 194 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS, 195 HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS); 196 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, 197 HConstants.DEFAULT_STATUS_MULTICAST_PORT); 198 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 199 200 InetAddress ina; 201 try { 202 ina = InetAddress.getByName(mcAddress); 203 } catch (UnknownHostException e) { 204 close(); 205 throw new IOException("Can't connect to " + mcAddress, e); 206 } 207 208 try { 209 Bootstrap b = new Bootstrap(); 210 b.group(group) 211 .channel(NioDatagramChannel.class) 212 .option(ChannelOption.SO_REUSEADDR, true) 213 .handler(new ClusterStatusHandler()); 214 channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel(); 215 } catch (InterruptedException e) { 216 close(); 217 throw ExceptionUtil.asInterrupt(e); 218 } 219 220 NetworkInterface ni; 221 if (niName != null) { 222 ni = NetworkInterface.getByName(niName); 223 } else { 224 ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); 225 } 226 227 LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); 228 channel.joinGroup(ina, ni, null, channel.newPromise()); 229 } 230 231 232 @Override 233 public void close() { 234 if (channel != null) { 235 channel.close(); 236 channel = null; 237 } 238 group.shutdownGracefully(); 239 } 240 241 242 243 /** 244 * Class, conforming to the Netty framework, that manages the message received. 245 */ 246 private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> { 247 248 @Override 249 public void exceptionCaught( 250 ChannelHandlerContext ctx, Throwable cause) 251 throws Exception { 252 LOG.error("Unexpected exception, continuing.", cause); 253 } 254 255 @Override 256 public boolean acceptInboundMessage(Object msg) throws Exception { 257 return super.acceptInboundMessage(msg); 258 } 259 260 261 @Override 262 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception { 263 ByteBufInputStream bis = new ByteBufInputStream(dp.content()); 264 try { 265 ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis); 266 ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp); 267 receive(ncs); 268 } finally { 269 bis.close(); 270 } 271 } 272 } 273 } 274}