001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020 021package org.apache.hadoop.hbase.master; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.net.Inet6Address; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.NetworkInterface; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.Comparator; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ConcurrentMap; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ClusterMetrics; 039import org.apache.hadoop.hbase.ClusterMetricsBuilder; 040import org.apache.hadoop.hbase.HBaseInterfaceAudience; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ScheduledChore; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.util.Addressing; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.util.ExceptionUtil; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.hadoop.hbase.util.ReflectionUtils; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.hbase.util.VersionInfo; 051import org.apache.yetus.audience.InterfaceAudience; 052 053import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 054import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory; 055import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 056import org.apache.hbase.thirdparty.io.netty.channel.Channel; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelException; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 064import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily; 065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 066import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder; 067import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 068 069 070/** 071 * Class to publish the cluster status to the client. This allows them to know immediately 072 * the dead region servers, hence to cut the connection they have with them, eventually stop 073 * waiting on the socket. This improves the mean time to recover, and as well allows to increase 074 * on the client the different timeouts, as the dead servers will be detected separately. 075 */ 076@InterfaceAudience.Private 077public class ClusterStatusPublisher extends ScheduledChore { 078 /** 079 * The implementation class used to publish the status. Default is null (no publish). 080 * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the 081 * status. 082 */ 083 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class"; 084 public static final Class<? extends ClusterStatusPublisher.Publisher> 085 DEFAULT_STATUS_PUBLISHER_CLASS = 086 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class; 087 088 /** 089 * The minimum time between two status messages, in milliseconds. 090 */ 091 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period"; 092 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000; 093 094 private long lastMessageTime = 0; 095 private final HMaster master; 096 private final int messagePeriod; // time between two message 097 private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>(); 098 private Publisher publisher; 099 private boolean connected = false; 100 101 /** 102 * We want to limit the size of the protobuf message sent, do fit into a single packet. 103 * a reasonable size for ip / ethernet is less than 1Kb. 104 */ 105 public final static int MAX_SERVER_PER_MESSAGE = 10; 106 107 /** 108 * If a server dies, we're sending the information multiple times in case a receiver misses the 109 * message. 110 */ 111 public final static int NB_SEND = 5; 112 113 public ClusterStatusPublisher(HMaster master, Configuration conf, 114 Class<? extends Publisher> publisherClass) 115 throws IOException { 116 super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt( 117 STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); 118 this.master = master; 119 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); 120 try { 121 this.publisher = publisherClass.getDeclaredConstructor().newInstance(); 122 } catch (Exception e) { 123 throw new IOException("Can't create publisher " + publisherClass.getName(), e); 124 } 125 this.publisher.connect(conf); 126 connected = true; 127 } 128 129 // For tests only 130 protected ClusterStatusPublisher() { 131 master = null; 132 messagePeriod = 0; 133 } 134 135 @Override 136 protected void chore() { 137 if (!isConnected()) { 138 return; 139 } 140 141 List<ServerName> sns = generateDeadServersListToSend(); 142 if (sns.isEmpty()) { 143 // Nothing to send. Done. 144 return; 145 } 146 147 final long curTime = EnvironmentEdgeManager.currentTime(); 148 if (lastMessageTime > curTime - messagePeriod) { 149 // We already sent something less than 10 second ago. Done. 150 return; 151 } 152 153 // Ok, we're going to send something then. 154 lastMessageTime = curTime; 155 156 // We're reusing an existing protobuf message, but we don't send everything. 157 // This could be extended in the future, for example if we want to send stuff like the 158 // hbase:meta server name. 159 publisher.publish(ClusterMetricsBuilder.newBuilder() 160 .setHBaseVersion(VersionInfo.getVersion()) 161 .setClusterId(master.getMasterFileSystem().getClusterId().toString()) 162 .setMasterName(master.getServerName()) 163 .setDeadServerNames(sns) 164 .build()); 165 } 166 167 @Override 168 protected synchronized void cleanup() { 169 connected = false; 170 publisher.close(); 171 } 172 173 private synchronized boolean isConnected() { 174 return this.connected; 175 } 176 177 /** 178 * Create the dead server to send. A dead server is sent NB_SEND times. We send at max 179 * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly 180 * dead first. 181 */ 182 protected List<ServerName> generateDeadServersListToSend() { 183 // We're getting the message sent since last time, and add them to the list 184 long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2; 185 for (Pair<ServerName, Long> dead : getDeadServers(since)) { 186 lastSent.putIfAbsent(dead.getFirst(), 0); 187 } 188 189 // We're sending the new deads first. 190 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet()); 191 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() { 192 @Override 193 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) { 194 return o1.getValue().compareTo(o2.getValue()); 195 } 196 }); 197 198 // With a limit of MAX_SERVER_PER_MESSAGE 199 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size(); 200 List<ServerName> res = new ArrayList<>(max); 201 202 for (int i = 0; i < max; i++) { 203 Map.Entry<ServerName, Integer> toSend = entries.get(i); 204 if (toSend.getValue() >= (NB_SEND - 1)) { 205 lastSent.remove(toSend.getKey()); 206 } else { 207 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1); 208 } 209 210 res.add(toSend.getKey()); 211 } 212 213 return res; 214 } 215 216 /** 217 * Get the servers which died since a given timestamp. 218 * protected because it can be subclassed by the tests. 219 */ 220 protected List<Pair<ServerName, Long>> getDeadServers(long since) { 221 if (master.getServerManager() == null) { 222 return Collections.emptyList(); 223 } 224 225 return master.getServerManager().getDeadServers().copyDeadServersSince(since); 226 } 227 228 229 public interface Publisher extends Closeable { 230 231 void connect(Configuration conf) throws IOException; 232 233 void publish(ClusterMetrics cs); 234 235 @Override 236 void close(); 237 } 238 239 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 240 public static class MulticastPublisher implements Publisher { 241 private DatagramChannel channel; 242 private final EventLoopGroup group = new NioEventLoopGroup( 243 1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher")); 244 245 public MulticastPublisher() { 246 } 247 248 @Override 249 public void connect(Configuration conf) throws IOException { 250 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, 251 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 252 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, 253 HConstants.DEFAULT_STATUS_MULTICAST_PORT); 254 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, 255 HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); 256 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 257 258 final InetAddress ina; 259 try { 260 ina = InetAddress.getByName(mcAddress); 261 } catch (UnknownHostException e) { 262 close(); 263 throw new IOException("Can't connect to " + mcAddress, e); 264 } 265 266 final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); 267 268 InternetProtocolFamily family; 269 NetworkInterface ni; 270 if (niName != null) { 271 if (ina instanceof Inet6Address) { 272 family = InternetProtocolFamily.IPv6; 273 } else { 274 family = InternetProtocolFamily.IPv4; 275 } 276 ni = NetworkInterface.getByName(niName); 277 } else { 278 InetAddress localAddress; 279 if (ina instanceof Inet6Address) { 280 localAddress = Addressing.getIp6Address(); 281 family = InternetProtocolFamily.IPv6; 282 } else { 283 localAddress = Addressing.getIp4Address(); 284 family = InternetProtocolFamily.IPv4; 285 } 286 ni = NetworkInterface.getByInetAddress(localAddress); 287 } 288 289 Bootstrap b = new Bootstrap(); 290 b.group(group) 291 .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) 292 .option(ChannelOption.SO_REUSEADDR, true) 293 .handler(new ClusterMetricsEncoder(isa)); 294 295 try { 296 channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); 297 channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); 298 channel.connect(isa).sync(); 299 } catch (InterruptedException e) { 300 close(); 301 throw ExceptionUtil.asInterrupt(e); 302 } 303 } 304 305 private static final class HBaseDatagramChannelFactory<T extends Channel> 306 implements ChannelFactory<T> { 307 private final Class<? extends T> clazz; 308 private InternetProtocolFamily family; 309 310 HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) { 311 this.clazz = clazz; 312 this.family = family; 313 } 314 315 @Override 316 public T newChannel() { 317 try { 318 return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), 319 new Class[] { InternetProtocolFamily.class }, new Object[] { family }); 320 321 } catch (Throwable t) { 322 throw new ChannelException("Unable to create Channel from class " + clazz, t); 323 } 324 } 325 326 @Override 327 public String toString() { 328 return StringUtil.simpleClassName(clazz) + ".class"; 329 } 330 } 331 332 private static final class ClusterMetricsEncoder 333 extends MessageToMessageEncoder<ClusterMetrics> { 334 final private InetSocketAddress isa; 335 336 private ClusterMetricsEncoder(InetSocketAddress isa) { 337 this.isa = isa; 338 } 339 340 @Override 341 protected void encode(ChannelHandlerContext channelHandlerContext, 342 ClusterMetrics clusterStatus, List<Object> objects) { 343 objects.add(new DatagramPacket(Unpooled.wrappedBuffer( 344 ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa)); 345 } 346 } 347 348 @Override 349 public void publish(ClusterMetrics cs) { 350 channel.writeAndFlush(cs).syncUninterruptibly(); 351 } 352 353 @Override 354 public void close() { 355 if (channel != null) { 356 channel.close(); 357 } 358 group.shutdownGracefully(); 359 } 360 } 361}