001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.Inet6Address; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.NetworkInterface; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterMetrics; 036import org.apache.hadoop.hbase.ClusterMetricsBuilder; 037import org.apache.hadoop.hbase.HBaseInterfaceAudience; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ScheduledChore; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.util.Addressing; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.ExceptionUtil; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.hbase.util.ReflectionUtils; 046import org.apache.hadoop.hbase.util.Threads; 047import org.apache.hadoop.hbase.util.VersionInfo; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 054import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 055import org.apache.hbase.thirdparty.io.netty.channel.Channel; 056import org.apache.hbase.thirdparty.io.netty.channel.ChannelException; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 064import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily; 065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 066import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder; 067import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 068 069/** 070 * Class to publish the cluster status to the client. This allows them to know immediately the dead 071 * region servers, hence to cut the connection they have with them, eventually stop waiting on the 072 * socket. This improves the mean time to recover, and as well allows to increase on the client the 073 * different timeouts, as the dead servers will be detected separately. 074 */ 075@InterfaceAudience.Private 076public class ClusterStatusPublisher extends ScheduledChore { 077 private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class); 078 /** 079 * The implementation class used to publish the status. Default is null (no publish). Use 080 * org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the 081 * status. 082 */ 083 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class"; 084 public static final Class< 085 ? extends ClusterStatusPublisher.Publisher> DEFAULT_STATUS_PUBLISHER_CLASS = 086 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class; 087 088 /** 089 * The minimum time between two status messages, in milliseconds. 090 */ 091 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period"; 092 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000; 093 094 private long lastMessageTime = 0; 095 private final HMaster master; 096 private final int messagePeriod; // time between two message 097 private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>(); 098 private Publisher publisher; 099 private boolean connected = false; 100 101 /** 102 * We want to limit the size of the protobuf message sent, do fit into a single packet. a 103 * reasonable size for ip / ethernet is less than 1Kb. 104 */ 105 public final static int MAX_SERVER_PER_MESSAGE = 10; 106 107 /** 108 * If a server dies, we're sending the information multiple times in case a receiver misses the 109 * message. 110 */ 111 public final static int NB_SEND = 5; 112 113 public ClusterStatusPublisher(HMaster master, Configuration conf, 114 Class<? extends Publisher> publisherClass) throws IOException { 115 super("ClusterStatusPublisher for=" + master.getName(), master, 116 conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); 117 this.master = master; 118 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); 119 try { 120 this.publisher = publisherClass.getDeclaredConstructor().newInstance(); 121 } catch (Exception e) { 122 throw new IOException("Can't create publisher " + publisherClass.getName(), e); 123 } 124 this.publisher.connect(conf); 125 connected = true; 126 } 127 128 @Override 129 public String toString() { 130 return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected; 131 } 132 133 // For tests only 134 protected ClusterStatusPublisher() { 135 master = null; 136 messagePeriod = 0; 137 } 138 139 @Override 140 protected void chore() { 141 if (!isConnected()) { 142 return; 143 } 144 145 List<ServerName> sns = generateDeadServersListToSend(); 146 if (sns.isEmpty()) { 147 // Nothing to send. Done. 148 return; 149 } 150 151 final long curTime = EnvironmentEdgeManager.currentTime(); 152 if (lastMessageTime > curTime - messagePeriod) { 153 // We already sent something less than 10 second ago. Done. 154 return; 155 } 156 157 // Ok, we're going to send something then. 158 lastMessageTime = curTime; 159 160 // We're reusing an existing protobuf message, but we don't send everything. 161 // This could be extended in the future, for example if we want to send stuff like the 162 // hbase:meta server name. 163 publisher.publish(ClusterMetricsBuilder.newBuilder().setHBaseVersion(VersionInfo.getVersion()) 164 .setClusterId(master.getMasterFileSystem().getClusterId().toString()) 165 .setMasterName(master.getServerName()).setDeadServerNames(sns).build()); 166 } 167 168 @Override 169 protected synchronized void cleanup() { 170 connected = false; 171 publisher.close(); 172 } 173 174 private synchronized boolean isConnected() { 175 return this.connected; 176 } 177 178 /** 179 * Create the dead server to send. A dead server is sent NB_SEND times. We send at max 180 * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly dead 181 * first. 182 */ 183 protected List<ServerName> generateDeadServersListToSend() { 184 // We're getting the message sent since last time, and add them to the list 185 long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2; 186 for (Pair<ServerName, Long> dead : getDeadServers(since)) { 187 lastSent.putIfAbsent(dead.getFirst(), 0); 188 } 189 190 // We're sending the new deads first. 191 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet()); 192 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() { 193 @Override 194 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) { 195 return o1.getValue().compareTo(o2.getValue()); 196 } 197 }); 198 199 // With a limit of MAX_SERVER_PER_MESSAGE 200 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size(); 201 List<ServerName> res = new ArrayList<>(max); 202 203 for (int i = 0; i < max; i++) { 204 Map.Entry<ServerName, Integer> toSend = entries.get(i); 205 if (toSend.getValue() >= (NB_SEND - 1)) { 206 lastSent.remove(toSend.getKey()); 207 } else { 208 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1); 209 } 210 211 res.add(toSend.getKey()); 212 } 213 214 return res; 215 } 216 217 /** 218 * Get the servers which died since a given timestamp. protected because it can be subclassed by 219 * the tests. 220 */ 221 protected List<Pair<ServerName, Long>> getDeadServers(long since) { 222 if (master.getServerManager() == null) { 223 return Collections.emptyList(); 224 } 225 226 return master.getServerManager().getDeadServers().copyDeadServersSince(since); 227 } 228 229 public interface Publisher extends Closeable { 230 231 void connect(Configuration conf) throws IOException; 232 233 void publish(ClusterMetrics cs); 234 235 @Override 236 void close(); 237 } 238 239 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 240 public static class MulticastPublisher implements Publisher { 241 private DatagramChannel channel; 242 private final EventLoopGroup group = new NioEventLoopGroup(1, 243 new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d") 244 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 245 246 public MulticastPublisher() { 247 } 248 249 @Override 250 public String toString() { 251 return "channel=" + this.channel; 252 } 253 254 @Override 255 public void connect(Configuration conf) throws IOException { 256 String mcAddress = 257 conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 258 int port = 259 conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); 260 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, 261 HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); 262 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 263 264 final InetAddress ina; 265 try { 266 ina = InetAddress.getByName(mcAddress); 267 } catch (UnknownHostException e) { 268 close(); 269 throw new IOException("Can't connect to " + mcAddress, e); 270 } 271 final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); 272 273 InternetProtocolFamily family; 274 NetworkInterface ni; 275 if (niName != null) { 276 if (ina instanceof Inet6Address) { 277 family = InternetProtocolFamily.IPv6; 278 } else { 279 family = InternetProtocolFamily.IPv4; 280 } 281 ni = NetworkInterface.getByName(niName); 282 } else { 283 InetAddress localAddress; 284 if (ina instanceof Inet6Address) { 285 localAddress = Addressing.getIp6Address(); 286 family = InternetProtocolFamily.IPv6; 287 } else { 288 localAddress = Addressing.getIp4Address(); 289 family = InternetProtocolFamily.IPv4; 290 } 291 ni = NetworkInterface.getByInetAddress(localAddress); 292 } 293 Bootstrap b = new Bootstrap(); 294 b.group(group) 295 .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) 296 .option(ChannelOption.SO_REUSEADDR, true).handler(new ClusterMetricsEncoder(isa)); 297 try { 298 LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); 299 channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); 300 channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); 301 channel.connect(isa).sync(); 302 // Set into configuration in case many networks available. Do this for tests so that 303 // server and client use same Interface (presuming share same Configuration). 304 // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks 305 // available with Master binding on one Interface and client on another so test failed. 306 if (ni != null) { 307 conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName()); 308 } 309 } catch (InterruptedException e) { 310 close(); 311 throw ExceptionUtil.asInterrupt(e); 312 } 313 } 314 315 private static final class HBaseDatagramChannelFactory<T extends Channel> 316 implements ChannelFactory<T> { 317 private final Class<? extends T> clazz; 318 private final InternetProtocolFamily family; 319 320 HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) { 321 this.clazz = clazz; 322 this.family = family; 323 } 324 325 @Override 326 public T newChannel() { 327 try { 328 return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), 329 new Class[] { InternetProtocolFamily.class }, new Object[] { family }); 330 331 } catch (Throwable t) { 332 throw new ChannelException("Unable to create Channel from class " + clazz, t); 333 } 334 } 335 336 @Override 337 public String toString() { 338 return StringUtil.simpleClassName(clazz) + ".class"; 339 } 340 } 341 342 private static final class ClusterMetricsEncoder 343 extends MessageToMessageEncoder<ClusterMetrics> { 344 final private InetSocketAddress isa; 345 346 private ClusterMetricsEncoder(InetSocketAddress isa) { 347 this.isa = isa; 348 } 349 350 @Override 351 protected void encode(ChannelHandlerContext channelHandlerContext, 352 ClusterMetrics clusterStatus, List<Object> objects) { 353 objects.add(new DatagramPacket(Unpooled 354 .wrappedBuffer(ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa)); 355 } 356 } 357 358 @Override 359 public void publish(ClusterMetrics cs) { 360 LOG.info("PUBLISH {}", cs); 361 channel.writeAndFlush(cs).syncUninterruptibly(); 362 } 363 364 @Override 365 public void close() { 366 if (channel != null) { 367 channel.close(); 368 } 369 group.shutdownGracefully(); 370 } 371 } 372}