001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.hadoop.hbase.master; 021 022import java.io.Closeable; 023import java.io.IOException; 024import java.net.Inet6Address; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.net.NetworkInterface; 028import java.net.UnknownHostException; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.Comparator; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ClusterMetrics; 038import org.apache.hadoop.hbase.ClusterMetricsBuilder; 039import org.apache.hadoop.hbase.HBaseInterfaceAudience; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ScheduledChore; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.util.Addressing; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.ExceptionUtil; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.hadoop.hbase.util.Threads; 049import org.apache.hadoop.hbase.util.VersionInfo; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 052import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 053import org.apache.hbase.thirdparty.io.netty.channel.Channel; 054import org.apache.hbase.thirdparty.io.netty.channel.ChannelException; 055import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory; 056import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 058import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 059import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 060import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 061import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 062import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 064import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder; 065import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069 070/** 071 * Class to publish the cluster status to the client. This allows them to know immediately 072 * the dead region servers, hence to cut the connection they have with them, eventually stop 073 * waiting on the socket. This improves the mean time to recover, and as well allows to increase 074 * on the client the different timeouts, as the dead servers will be detected separately. 075 */ 076@InterfaceAudience.Private 077public class ClusterStatusPublisher extends ScheduledChore { 078 private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class); 079 /** 080 * The implementation class used to publish the status. Default is null (no publish). 081 * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the 082 * status. 083 */ 084 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class"; 085 public static final Class<? extends ClusterStatusPublisher.Publisher> 086 DEFAULT_STATUS_PUBLISHER_CLASS = 087 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class; 088 089 /** 090 * The minimum time between two status messages, in milliseconds. 091 */ 092 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period"; 093 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000; 094 095 private long lastMessageTime = 0; 096 private final HMaster master; 097 private final int messagePeriod; // time between two message 098 private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>(); 099 private Publisher publisher; 100 private boolean connected = false; 101 102 /** 103 * We want to limit the size of the protobuf message sent, do fit into a single packet. 104 * a reasonable size for ip / ethernet is less than 1Kb. 105 */ 106 public final static int MAX_SERVER_PER_MESSAGE = 10; 107 108 /** 109 * If a server dies, we're sending the information multiple times in case a receiver misses the 110 * message. 111 */ 112 public final static int NB_SEND = 5; 113 114 public ClusterStatusPublisher(HMaster master, Configuration conf, 115 Class<? extends Publisher> publisherClass) 116 throws IOException { 117 super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt( 118 STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); 119 this.master = master; 120 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); 121 try { 122 this.publisher = publisherClass.getDeclaredConstructor().newInstance(); 123 } catch (Exception e) { 124 throw new IOException("Can't create publisher " + publisherClass.getName(), e); 125 } 126 this.publisher.connect(conf); 127 connected = true; 128 } 129 130 @Override 131 public String toString() { 132 return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected; 133 } 134 135 // For tests only 136 protected ClusterStatusPublisher() { 137 master = null; 138 messagePeriod = 0; 139 } 140 141 @Override 142 protected void chore() { 143 if (!isConnected()) { 144 return; 145 } 146 147 List<ServerName> sns = generateDeadServersListToSend(); 148 if (sns.isEmpty()) { 149 // Nothing to send. Done. 150 return; 151 } 152 153 final long curTime = EnvironmentEdgeManager.currentTime(); 154 if (lastMessageTime > curTime - messagePeriod) { 155 // We already sent something less than 10 second ago. Done. 156 return; 157 } 158 159 // Ok, we're going to send something then. 160 lastMessageTime = curTime; 161 162 // We're reusing an existing protobuf message, but we don't send everything. 163 // This could be extended in the future, for example if we want to send stuff like the 164 // hbase:meta server name. 165 publisher.publish(ClusterMetricsBuilder.newBuilder() 166 .setHBaseVersion(VersionInfo.getVersion()) 167 .setClusterId(master.getMasterFileSystem().getClusterId().toString()) 168 .setMasterName(master.getServerName()) 169 .setDeadServerNames(sns) 170 .build()); 171 } 172 173 @Override 174 protected synchronized void cleanup() { 175 connected = false; 176 publisher.close(); 177 } 178 179 private synchronized boolean isConnected() { 180 return this.connected; 181 } 182 183 /** 184 * Create the dead server to send. A dead server is sent NB_SEND times. We send at max 185 * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly 186 * dead first. 187 */ 188 protected List<ServerName> generateDeadServersListToSend() { 189 // We're getting the message sent since last time, and add them to the list 190 long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2; 191 for (Pair<ServerName, Long> dead : getDeadServers(since)) { 192 lastSent.putIfAbsent(dead.getFirst(), 0); 193 } 194 195 // We're sending the new deads first. 196 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet()); 197 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() { 198 @Override 199 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) { 200 return o1.getValue().compareTo(o2.getValue()); 201 } 202 }); 203 204 // With a limit of MAX_SERVER_PER_MESSAGE 205 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size(); 206 List<ServerName> res = new ArrayList<>(max); 207 208 for (int i = 0; i < max; i++) { 209 Map.Entry<ServerName, Integer> toSend = entries.get(i); 210 if (toSend.getValue() >= (NB_SEND - 1)) { 211 lastSent.remove(toSend.getKey()); 212 } else { 213 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1); 214 } 215 216 res.add(toSend.getKey()); 217 } 218 219 return res; 220 } 221 222 /** 223 * Get the servers which died since a given timestamp. 224 * protected because it can be subclassed by the tests. 225 */ 226 protected List<Pair<ServerName, Long>> getDeadServers(long since) { 227 if (master.getServerManager() == null) { 228 return Collections.emptyList(); 229 } 230 231 return master.getServerManager().getDeadServers().copyDeadServersSince(since); 232 } 233 234 235 public interface Publisher extends Closeable { 236 237 void connect(Configuration conf) throws IOException; 238 239 void publish(ClusterMetrics cs); 240 241 @Override 242 void close(); 243 } 244 245 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 246 public static class MulticastPublisher implements Publisher { 247 private DatagramChannel channel; 248 private final EventLoopGroup group = new NioEventLoopGroup( 249 1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher")); 250 251 public MulticastPublisher() { 252 } 253 254 @Override 255 public String toString() { 256 return "channel=" + this.channel; 257 } 258 259 @Override 260 public void connect(Configuration conf) throws IOException { 261 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, 262 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 263 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, 264 HConstants.DEFAULT_STATUS_MULTICAST_PORT); 265 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, 266 HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); 267 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 268 269 final InetAddress ina; 270 try { 271 ina = InetAddress.getByName(mcAddress); 272 } catch (UnknownHostException e) { 273 close(); 274 throw new IOException("Can't connect to " + mcAddress, e); 275 } 276 final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); 277 278 InternetProtocolFamily family; 279 NetworkInterface ni; 280 if (niName != null) { 281 if (ina instanceof Inet6Address) { 282 family = InternetProtocolFamily.IPv6; 283 } else { 284 family = InternetProtocolFamily.IPv4; 285 } 286 ni = NetworkInterface.getByName(niName); 287 } else { 288 InetAddress localAddress; 289 if (ina instanceof Inet6Address) { 290 localAddress = Addressing.getIp6Address(); 291 family = InternetProtocolFamily.IPv6; 292 } else { 293 localAddress = Addressing.getIp4Address(); 294 family = InternetProtocolFamily.IPv4; 295 } 296 ni = NetworkInterface.getByInetAddress(localAddress); 297 } 298 Bootstrap b = new Bootstrap(); 299 b.group(group) 300 .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) 301 .option(ChannelOption.SO_REUSEADDR, true) 302 .handler(new ClusterMetricsEncoder(isa)); 303 try { 304 LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); 305 channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); 306 channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); 307 channel.connect(isa).sync(); 308 // Set into configuration in case many networks available. Do this for tests so that 309 // server and client use same Interface (presuming share same Configuration). 310 // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks 311 // available with Master binding on one Interface and client on another so test failed. 312 if (ni != null) { 313 conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName()); 314 } 315 } catch (InterruptedException e) { 316 close(); 317 throw ExceptionUtil.asInterrupt(e); 318 } 319 } 320 321 private static final class HBaseDatagramChannelFactory<T extends Channel> 322 implements ChannelFactory<T> { 323 private final Class<? extends T> clazz; 324 private final InternetProtocolFamily family; 325 326 HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) { 327 this.clazz = clazz; 328 this.family = family; 329 } 330 331 @Override 332 public T newChannel() { 333 try { 334 return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), 335 new Class[] { InternetProtocolFamily.class }, new Object[] { family }); 336 337 } catch (Throwable t) { 338 throw new ChannelException("Unable to create Channel from class " + clazz, t); 339 } 340 } 341 342 @Override 343 public String toString() { 344 return StringUtil.simpleClassName(clazz) + ".class"; 345 } 346 } 347 348 private static final class ClusterMetricsEncoder 349 extends MessageToMessageEncoder<ClusterMetrics> { 350 final private InetSocketAddress isa; 351 352 private ClusterMetricsEncoder(InetSocketAddress isa) { 353 this.isa = isa; 354 } 355 356 @Override 357 protected void encode(ChannelHandlerContext channelHandlerContext, 358 ClusterMetrics clusterStatus, List<Object> objects) { 359 objects.add(new DatagramPacket(Unpooled.wrappedBuffer( 360 ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa)); 361 } 362 } 363 364 @Override 365 public void publish(ClusterMetrics cs) { 366 LOG.info("PUBLISH {}", cs); 367 channel.writeAndFlush(cs).syncUninterruptibly(); 368 } 369 370 @Override 371 public void close() { 372 if (channel != null) { 373 channel.close(); 374 } 375 group.shutdownGracefully(); 376 } 377 } 378}