001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.InvocationTargetException; 024import java.net.InetAddress; 025import java.net.NetworkInterface; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ClusterMetrics; 031import org.apache.hadoop.hbase.ClusterMetricsBuilder; 032import org.apache.hadoop.hbase.HBaseInterfaceAudience; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.util.Addressing; 036import org.apache.hadoop.hbase.util.ExceptionUtil; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 044import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 045import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 046import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 047import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 049import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 050import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 051import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 052import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 055 056/** 057 * A class that receives the cluster status, and provide it as a set of service to the client. 058 * Today, manages only the dead server list. The class is abstract to allow multiple 059 * implementations, from ZooKeeper to multicast based. 060 */ 061@InterfaceAudience.Private 062class ClusterStatusListener implements Closeable { 063 private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusListener.class); 064 private final List<ServerName> deadServers = new ArrayList<>(); 065 protected final DeadServerHandler deadServerHandler; 066 private final Listener listener; 067 068 /** 069 * The implementation class to use to read the status. 070 */ 071 public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class"; 072 public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS = 073 MulticastListener.class; 074 075 /** 076 * Class to be extended to manage a new dead server. 077 */ 078 public interface DeadServerHandler { 079 080 /** 081 * Called when a server is identified as dead. Called only once even if we receive the 082 * information multiple times. 083 * @param sn - the server name 084 */ 085 void newDead(ServerName sn); 086 } 087 088 /** 089 * The interface to be implemented by a listener of a cluster status event. 090 */ 091 interface Listener extends Closeable { 092 /** 093 * Called to close the resources, if any. Cannot throw an exception. 094 */ 095 @Override 096 void close(); 097 098 /** 099 * Called to connect. 100 * @param conf Configuration to use. 101 * @throws IOException if failing to connect 102 */ 103 void connect(Configuration conf) throws IOException; 104 } 105 106 public ClusterStatusListener(DeadServerHandler dsh, Configuration conf, 107 Class<? extends Listener> listenerClass) throws IOException { 108 this.deadServerHandler = dsh; 109 try { 110 Constructor<? extends Listener> ctor = 111 listenerClass.getConstructor(ClusterStatusListener.class); 112 this.listener = ctor.newInstance(this); 113 } catch (InstantiationException e) { 114 throw new IOException("Can't create listener " + listenerClass.getName(), e); 115 } catch (IllegalAccessException e) { 116 throw new IOException("Can't create listener " + listenerClass.getName(), e); 117 } catch (NoSuchMethodException e) { 118 throw new IllegalStateException(); 119 } catch (InvocationTargetException e) { 120 throw new IllegalStateException(); 121 } 122 123 this.listener.connect(conf); 124 } 125 126 /** 127 * Acts upon the reception of a new cluster status. 128 * @param ncs the cluster status 129 */ 130 public void receive(ClusterMetrics ncs) { 131 if (ncs.getDeadServerNames() != null) { 132 for (ServerName sn : ncs.getDeadServerNames()) { 133 if (!isDeadServer(sn)) { 134 LOG.info("There is a new dead server: " + sn); 135 deadServers.add(sn); 136 if (deadServerHandler != null) { 137 deadServerHandler.newDead(sn); 138 } 139 } 140 } 141 } 142 } 143 144 @Override 145 public void close() { 146 listener.close(); 147 } 148 149 /** 150 * Check if we know if a server is dead. 151 * @param sn the server name to check. 152 * @return true if we know for sure that the server is dead, false otherwise. 153 */ 154 public boolean isDeadServer(ServerName sn) { 155 if (sn.getStartcode() <= 0) { 156 return false; 157 } 158 159 for (ServerName dead : deadServers) { 160 if ( 161 dead.getStartcode() >= sn.getStartcode() && dead.getPort() == sn.getPort() 162 && dead.getHostname().equals(sn.getHostname()) 163 ) { 164 return true; 165 } 166 } 167 168 return false; 169 } 170 171 /** 172 * An implementation using a multicast message between the master & the client. 173 */ 174 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 175 class MulticastListener implements Listener { 176 private DatagramChannel channel; 177 private final EventLoopGroup group = new NioEventLoopGroup(1, 178 new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d") 179 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 180 181 public MulticastListener() { 182 } 183 184 @Override 185 public void connect(Configuration conf) throws IOException { 186 187 String mcAddress = 188 conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 189 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS, 190 HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS); 191 int port = 192 conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); 193 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 194 195 InetAddress ina; 196 try { 197 ina = InetAddress.getByName(mcAddress); 198 } catch (UnknownHostException e) { 199 close(); 200 throw new IOException("Can't connect to " + mcAddress, e); 201 } 202 203 try { 204 Bootstrap b = new Bootstrap(); 205 b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_REUSEADDR, true) 206 .handler(new ClusterStatusHandler()); 207 channel = (DatagramChannel) b.bind(bindAddress, port).sync().channel(); 208 } catch (InterruptedException e) { 209 close(); 210 throw ExceptionUtil.asInterrupt(e); 211 } 212 213 NetworkInterface ni; 214 if (niName != null) { 215 ni = NetworkInterface.getByName(niName); 216 } else { 217 ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); 218 } 219 220 LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); 221 channel.joinGroup(ina, ni, null, channel.newPromise()); 222 } 223 224 @Override 225 public void close() { 226 if (channel != null) { 227 channel.close(); 228 channel = null; 229 } 230 group.shutdownGracefully(); 231 } 232 233 /** 234 * Class, conforming to the Netty framework, that manages the message received. 235 */ 236 private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> { 237 238 @Override 239 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 240 LOG.error("Unexpected exception, continuing.", cause); 241 } 242 243 @Override 244 public boolean acceptInboundMessage(Object msg) throws Exception { 245 return super.acceptInboundMessage(msg); 246 } 247 248 @Override 249 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception { 250 ByteBufInputStream bis = new ByteBufInputStream(dp.content()); 251 try { 252 ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis); 253 ClusterMetrics ncs = ClusterMetricsBuilder.toClusterMetrics(csp); 254 receive(ncs); 255 } finally { 256 bis.close(); 257 } 258 } 259 } 260 } 261}