001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.hadoop.hbase.master; 021 022import java.io.Closeable; 023import java.io.IOException; 024import java.net.Inet6Address; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.net.NetworkInterface; 028import java.net.UnknownHostException; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.Comparator; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ClusterMetrics; 038import org.apache.hadoop.hbase.ClusterMetricsBuilder; 039import org.apache.hadoop.hbase.HBaseInterfaceAudience; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ScheduledChore; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.util.Addressing; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.ExceptionUtil; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.hadoop.hbase.util.Threads; 049import org.apache.hadoop.hbase.util.VersionInfo; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 053import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 054import org.apache.hbase.thirdparty.io.netty.channel.Channel; 055import org.apache.hbase.thirdparty.io.netty.channel.ChannelException; 056import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 059import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 060import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 061import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily; 064import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 065import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder; 066import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070 071/** 072 * Class to publish the cluster status to the client. This allows them to know immediately 073 * the dead region servers, hence to cut the connection they have with them, eventually stop 074 * waiting on the socket. This improves the mean time to recover, and as well allows to increase 075 * on the client the different timeouts, as the dead servers will be detected separately. 076 */ 077@InterfaceAudience.Private 078public class ClusterStatusPublisher extends ScheduledChore { 079 private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class); 080 /** 081 * The implementation class used to publish the status. Default is null (no publish). 082 * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the 083 * status. 084 */ 085 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class"; 086 public static final Class<? extends ClusterStatusPublisher.Publisher> 087 DEFAULT_STATUS_PUBLISHER_CLASS = 088 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class; 089 090 /** 091 * The minimum time between two status messages, in milliseconds. 092 */ 093 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period"; 094 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000; 095 096 private long lastMessageTime = 0; 097 private final HMaster master; 098 private final int messagePeriod; // time between two message 099 private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>(); 100 private Publisher publisher; 101 private boolean connected = false; 102 103 /** 104 * We want to limit the size of the protobuf message sent, do fit into a single packet. 105 * a reasonable size for ip / ethernet is less than 1Kb. 106 */ 107 public final static int MAX_SERVER_PER_MESSAGE = 10; 108 109 /** 110 * If a server dies, we're sending the information multiple times in case a receiver misses the 111 * message. 112 */ 113 public final static int NB_SEND = 5; 114 115 public ClusterStatusPublisher(HMaster master, Configuration conf, 116 Class<? extends Publisher> publisherClass) 117 throws IOException { 118 super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt( 119 STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); 120 this.master = master; 121 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); 122 try { 123 this.publisher = publisherClass.getDeclaredConstructor().newInstance(); 124 } catch (Exception e) { 125 throw new IOException("Can't create publisher " + publisherClass.getName(), e); 126 } 127 this.publisher.connect(conf); 128 connected = true; 129 } 130 131 @Override 132 public String toString() { 133 return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected; 134 } 135 136 // For tests only 137 protected ClusterStatusPublisher() { 138 master = null; 139 messagePeriod = 0; 140 } 141 142 @Override 143 protected void chore() { 144 if (!isConnected()) { 145 return; 146 } 147 148 List<ServerName> sns = generateDeadServersListToSend(); 149 if (sns.isEmpty()) { 150 // Nothing to send. Done. 151 return; 152 } 153 154 final long curTime = EnvironmentEdgeManager.currentTime(); 155 if (lastMessageTime > curTime - messagePeriod) { 156 // We already sent something less than 10 second ago. Done. 157 return; 158 } 159 160 // Ok, we're going to send something then. 161 lastMessageTime = curTime; 162 163 // We're reusing an existing protobuf message, but we don't send everything. 164 // This could be extended in the future, for example if we want to send stuff like the 165 // hbase:meta server name. 166 publisher.publish(ClusterMetricsBuilder.newBuilder() 167 .setHBaseVersion(VersionInfo.getVersion()) 168 .setClusterId(master.getMasterFileSystem().getClusterId().toString()) 169 .setMasterName(master.getServerName()) 170 .setDeadServerNames(sns) 171 .build()); 172 } 173 174 @Override 175 protected synchronized void cleanup() { 176 connected = false; 177 publisher.close(); 178 } 179 180 private synchronized boolean isConnected() { 181 return this.connected; 182 } 183 184 /** 185 * Create the dead server to send. A dead server is sent NB_SEND times. We send at max 186 * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly 187 * dead first. 188 */ 189 protected List<ServerName> generateDeadServersListToSend() { 190 // We're getting the message sent since last time, and add them to the list 191 long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2; 192 for (Pair<ServerName, Long> dead : getDeadServers(since)) { 193 lastSent.putIfAbsent(dead.getFirst(), 0); 194 } 195 196 // We're sending the new deads first. 197 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet()); 198 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() { 199 @Override 200 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) { 201 return o1.getValue().compareTo(o2.getValue()); 202 } 203 }); 204 205 // With a limit of MAX_SERVER_PER_MESSAGE 206 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size(); 207 List<ServerName> res = new ArrayList<>(max); 208 209 for (int i = 0; i < max; i++) { 210 Map.Entry<ServerName, Integer> toSend = entries.get(i); 211 if (toSend.getValue() >= (NB_SEND - 1)) { 212 lastSent.remove(toSend.getKey()); 213 } else { 214 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1); 215 } 216 217 res.add(toSend.getKey()); 218 } 219 220 return res; 221 } 222 223 /** 224 * Get the servers which died since a given timestamp. 225 * protected because it can be subclassed by the tests. 226 */ 227 protected List<Pair<ServerName, Long>> getDeadServers(long since) { 228 if (master.getServerManager() == null) { 229 return Collections.emptyList(); 230 } 231 232 return master.getServerManager().getDeadServers().copyDeadServersSince(since); 233 } 234 235 236 public interface Publisher extends Closeable { 237 238 void connect(Configuration conf) throws IOException; 239 240 void publish(ClusterMetrics cs); 241 242 @Override 243 void close(); 244 } 245 246 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 247 public static class MulticastPublisher implements Publisher { 248 private DatagramChannel channel; 249 private final EventLoopGroup group = new NioEventLoopGroup(1, 250 new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d") 251 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 252 253 public MulticastPublisher() { 254 } 255 256 @Override 257 public String toString() { 258 return "channel=" + this.channel; 259 } 260 261 @Override 262 public void connect(Configuration conf) throws IOException { 263 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, 264 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 265 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, 266 HConstants.DEFAULT_STATUS_MULTICAST_PORT); 267 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, 268 HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); 269 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 270 271 final InetAddress ina; 272 try { 273 ina = InetAddress.getByName(mcAddress); 274 } catch (UnknownHostException e) { 275 close(); 276 throw new IOException("Can't connect to " + mcAddress, e); 277 } 278 final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); 279 280 InternetProtocolFamily family; 281 NetworkInterface ni; 282 if (niName != null) { 283 if (ina instanceof Inet6Address) { 284 family = InternetProtocolFamily.IPv6; 285 } else { 286 family = InternetProtocolFamily.IPv4; 287 } 288 ni = NetworkInterface.getByName(niName); 289 } else { 290 InetAddress localAddress; 291 if (ina instanceof Inet6Address) { 292 localAddress = Addressing.getIp6Address(); 293 family = InternetProtocolFamily.IPv6; 294 } else { 295 localAddress = Addressing.getIp4Address(); 296 family = InternetProtocolFamily.IPv4; 297 } 298 ni = NetworkInterface.getByInetAddress(localAddress); 299 } 300 Bootstrap b = new Bootstrap(); 301 b.group(group) 302 .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) 303 .option(ChannelOption.SO_REUSEADDR, true) 304 .handler(new ClusterMetricsEncoder(isa)); 305 try { 306 LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); 307 channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); 308 channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); 309 channel.connect(isa).sync(); 310 // Set into configuration in case many networks available. Do this for tests so that 311 // server and client use same Interface (presuming share same Configuration). 312 // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks 313 // available with Master binding on one Interface and client on another so test failed. 314 if (ni != null) { 315 conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName()); 316 } 317 } catch (InterruptedException e) { 318 close(); 319 throw ExceptionUtil.asInterrupt(e); 320 } 321 } 322 323 private static final class HBaseDatagramChannelFactory<T extends Channel> 324 implements ChannelFactory<T> { 325 private final Class<? extends T> clazz; 326 private final InternetProtocolFamily family; 327 328 HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) { 329 this.clazz = clazz; 330 this.family = family; 331 } 332 333 @Override 334 public T newChannel() { 335 try { 336 return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), 337 new Class[] { InternetProtocolFamily.class }, new Object[] { family }); 338 339 } catch (Throwable t) { 340 throw new ChannelException("Unable to create Channel from class " + clazz, t); 341 } 342 } 343 344 @Override 345 public String toString() { 346 return StringUtil.simpleClassName(clazz) + ".class"; 347 } 348 } 349 350 private static final class ClusterMetricsEncoder 351 extends MessageToMessageEncoder<ClusterMetrics> { 352 final private InetSocketAddress isa; 353 354 private ClusterMetricsEncoder(InetSocketAddress isa) { 355 this.isa = isa; 356 } 357 358 @Override 359 protected void encode(ChannelHandlerContext channelHandlerContext, 360 ClusterMetrics clusterStatus, List<Object> objects) { 361 objects.add(new DatagramPacket(Unpooled.wrappedBuffer( 362 ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa)); 363 } 364 } 365 366 @Override 367 public void publish(ClusterMetrics cs) { 368 LOG.info("PUBLISH {}", cs); 369 channel.writeAndFlush(cs).syncUninterruptibly(); 370 } 371 372 @Override 373 public void close() { 374 if (channel != null) { 375 channel.close(); 376 } 377 group.shutdownGracefully(); 378 } 379 } 380}