001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020 021package org.apache.hadoop.hbase.master; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.net.Inet6Address; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.NetworkInterface; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.Comparator; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ConcurrentMap; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ClusterMetrics; 039import org.apache.hadoop.hbase.ClusterMetricsBuilder; 040import org.apache.hadoop.hbase.HBaseInterfaceAudience; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ScheduledChore; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.util.Addressing; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.util.ExceptionUtil; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.hadoop.hbase.util.ReflectionUtils; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.hbase.util.VersionInfo; 051import org.apache.yetus.audience.InterfaceAudience; 052 053import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 054import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory; 055import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 056import org.apache.hbase.thirdparty.io.netty.channel.Channel; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelException; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; 064import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily; 065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; 066import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder; 067import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 068 069 070/** 071 * Class to publish the cluster status to the client. This allows them to know immediately 072 * the dead region servers, hence to cut the connection they have with them, eventually stop 073 * waiting on the socket. This improves the mean time to recover, and as well allows to increase 074 * on the client the different timeouts, as the dead servers will be detected separately. 075 */ 076@InterfaceAudience.Private 077public class ClusterStatusPublisher extends ScheduledChore { 078 /** 079 * The implementation class used to publish the status. Default is null (no publish). 080 * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the 081 * status. 082 */ 083 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class"; 084 public static final Class<? extends ClusterStatusPublisher.Publisher> 085 DEFAULT_STATUS_PUBLISHER_CLASS = 086 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class; 087 088 /** 089 * The minimum time between two status messages, in milliseconds. 090 */ 091 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period"; 092 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000; 093 094 private long lastMessageTime = 0; 095 private final HMaster master; 096 private final int messagePeriod; // time between two message 097 private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>(); 098 private Publisher publisher; 099 private boolean connected = false; 100 101 /** 102 * We want to limit the size of the protobuf message sent, do fit into a single packet. 103 * a reasonable size for ip / ethernet is less than 1Kb. 104 */ 105 public final static int MAX_SERVER_PER_MESSAGE = 10; 106 107 /** 108 * If a server dies, we're sending the information multiple times in case a receiver misses the 109 * message. 110 */ 111 public final static int NB_SEND = 5; 112 113 public ClusterStatusPublisher(HMaster master, Configuration conf, 114 Class<? extends Publisher> publisherClass) 115 throws IOException { 116 super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt( 117 STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); 118 this.master = master; 119 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); 120 try { 121 this.publisher = publisherClass.getDeclaredConstructor().newInstance(); 122 } catch (Exception e) { 123 throw new IOException("Can't create publisher " + publisherClass.getName(), e); 124 } 125 this.publisher.connect(conf); 126 connected = true; 127 } 128 129 // For tests only 130 protected ClusterStatusPublisher() { 131 master = null; 132 messagePeriod = 0; 133 } 134 135 @Override 136 protected void chore() { 137 if (!isConnected()) { 138 return; 139 } 140 141 List<ServerName> sns = generateDeadServersListToSend(); 142 if (sns.isEmpty()) { 143 // Nothing to send. Done. 144 return; 145 } 146 147 final long curTime = EnvironmentEdgeManager.currentTime(); 148 if (lastMessageTime > curTime - messagePeriod) { 149 // We already sent something less than 10 second ago. Done. 150 return; 151 } 152 153 // Ok, we're going to send something then. 154 lastMessageTime = curTime; 155 156 // We're reusing an existing protobuf message, but we don't send everything. 157 // This could be extended in the future, for example if we want to send stuff like the 158 // hbase:meta server name. 159 publisher.publish(ClusterMetricsBuilder.newBuilder() 160 .setHBaseVersion(VersionInfo.getVersion()) 161 .setClusterId(master.getMasterFileSystem().getClusterId().toString()) 162 .setMasterName(master.getServerName()) 163 .setDeadServerNames(sns) 164 .build()); 165 } 166 167 @Override 168 protected synchronized void cleanup() { 169 connected = false; 170 publisher.close(); 171 } 172 173 private synchronized boolean isConnected() { 174 return this.connected; 175 } 176 177 /** 178 * Create the dead server to send. A dead server is sent NB_SEND times. We send at max 179 * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly 180 * dead first. 181 */ 182 protected List<ServerName> generateDeadServersListToSend() { 183 // We're getting the message sent since last time, and add them to the list 184 long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2; 185 for (Pair<ServerName, Long> dead : getDeadServers(since)) { 186 lastSent.putIfAbsent(dead.getFirst(), 0); 187 } 188 189 // We're sending the new deads first. 190 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(); 191 entries.addAll(lastSent.entrySet()); 192 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() { 193 @Override 194 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) { 195 return o1.getValue().compareTo(o2.getValue()); 196 } 197 }); 198 199 // With a limit of MAX_SERVER_PER_MESSAGE 200 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size(); 201 List<ServerName> res = new ArrayList<>(max); 202 203 for (int i = 0; i < max; i++) { 204 Map.Entry<ServerName, Integer> toSend = entries.get(i); 205 if (toSend.getValue() >= (NB_SEND - 1)) { 206 lastSent.remove(toSend.getKey()); 207 } else { 208 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1); 209 } 210 211 res.add(toSend.getKey()); 212 } 213 214 return res; 215 } 216 217 /** 218 * Get the servers which died since a given timestamp. 219 * protected because it can be subclassed by the tests. 220 */ 221 protected List<Pair<ServerName, Long>> getDeadServers(long since) { 222 if (master.getServerManager() == null) { 223 return Collections.emptyList(); 224 } 225 226 return master.getServerManager().getDeadServers().copyDeadServersSince(since); 227 } 228 229 230 public interface Publisher extends Closeable { 231 232 void connect(Configuration conf) throws IOException; 233 234 void publish(ClusterMetrics cs); 235 236 @Override 237 void close(); 238 } 239 240 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 241 public static class MulticastPublisher implements Publisher { 242 private DatagramChannel channel; 243 private final EventLoopGroup group = new NioEventLoopGroup( 244 1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher")); 245 246 public MulticastPublisher() { 247 } 248 249 @Override 250 public void connect(Configuration conf) throws IOException { 251 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, 252 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); 253 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, 254 HConstants.DEFAULT_STATUS_MULTICAST_PORT); 255 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, 256 HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); 257 String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); 258 259 final InetAddress ina; 260 try { 261 ina = InetAddress.getByName(mcAddress); 262 } catch (UnknownHostException e) { 263 close(); 264 throw new IOException("Can't connect to " + mcAddress, e); 265 } 266 267 final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); 268 269 InternetProtocolFamily family; 270 NetworkInterface ni; 271 if (niName != null) { 272 if (ina instanceof Inet6Address) { 273 family = InternetProtocolFamily.IPv6; 274 } else { 275 family = InternetProtocolFamily.IPv4; 276 } 277 ni = NetworkInterface.getByName(niName); 278 } else { 279 InetAddress localAddress; 280 if (ina instanceof Inet6Address) { 281 localAddress = Addressing.getIp6Address(); 282 family = InternetProtocolFamily.IPv6; 283 } else { 284 localAddress = Addressing.getIp4Address(); 285 family = InternetProtocolFamily.IPv4; 286 } 287 ni = NetworkInterface.getByInetAddress(localAddress); 288 } 289 290 Bootstrap b = new Bootstrap(); 291 b.group(group) 292 .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) 293 .option(ChannelOption.SO_REUSEADDR, true) 294 .handler(new ClusterMetricsEncoder(isa)); 295 296 try { 297 channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); 298 channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); 299 channel.connect(isa).sync(); 300 } catch (InterruptedException e) { 301 close(); 302 throw ExceptionUtil.asInterrupt(e); 303 } 304 } 305 306 private static final class HBaseDatagramChannelFactory<T extends Channel> 307 implements ChannelFactory<T> { 308 private final Class<? extends T> clazz; 309 private InternetProtocolFamily family; 310 311 HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) { 312 this.clazz = clazz; 313 this.family = family; 314 } 315 316 @Override 317 public T newChannel() { 318 try { 319 return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), 320 new Class[] { InternetProtocolFamily.class }, new Object[] { family }); 321 322 } catch (Throwable t) { 323 throw new ChannelException("Unable to create Channel from class " + clazz, t); 324 } 325 } 326 327 @Override 328 public String toString() { 329 return StringUtil.simpleClassName(clazz) + ".class"; 330 } 331 } 332 333 private static final class ClusterMetricsEncoder 334 extends MessageToMessageEncoder<ClusterMetrics> { 335 final private InetSocketAddress isa; 336 337 private ClusterMetricsEncoder(InetSocketAddress isa) { 338 this.isa = isa; 339 } 340 341 @Override 342 protected void encode(ChannelHandlerContext channelHandlerContext, 343 ClusterMetrics clusterStatus, List<Object> objects) { 344 objects.add(new DatagramPacket(Unpooled.wrappedBuffer( 345 ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa)); 346 } 347 } 348 349 @Override 350 public void publish(ClusterMetrics cs) { 351 channel.writeAndFlush(cs).syncUninterruptibly(); 352 } 353 354 @Override 355 public void close() { 356 if (channel != null) { 357 channel.close(); 358 } 359 group.shutdownGracefully(); 360 } 361 } 362}