001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.net.Inet6Address;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.NetworkInterface;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.ClusterMetricsBuilder;
037import org.apache.hadoop.hbase.HBaseInterfaceAudience;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ScheduledChore;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.util.Addressing;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.util.ExceptionUtil;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.hbase.util.ReflectionUtils;
046import org.apache.hadoop.hbase.util.Threads;
047import org.apache.hadoop.hbase.util.VersionInfo;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
054import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
055import org.apache.hbase.thirdparty.io.netty.channel.Channel;
056import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
063import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
064import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily;
065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
066import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
067import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
068
069/**
070 * Class to publish the cluster status to the client. This allows them to know immediately the dead
071 * region servers, hence to cut the connection they have with them, eventually stop waiting on the
072 * socket. This improves the mean time to recover, and as well allows to increase on the client the
073 * different timeouts, as the dead servers will be detected separately.
074 */
075@InterfaceAudience.Private
076public class ClusterStatusPublisher extends ScheduledChore {
077  private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class);
078  /**
079   * The implementation class used to publish the status. Default is null (no publish). Use
080   * org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
081   * status.
082   */
083  public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
084  public static final Class<
085    ? extends ClusterStatusPublisher.Publisher> DEFAULT_STATUS_PUBLISHER_CLASS =
086      org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
087
088  /**
089   * The minimum time between two status messages, in milliseconds.
090   */
091  public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
092  public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
093
094  private long lastMessageTime = 0;
095  private final HMaster master;
096  private final int messagePeriod; // time between two message
097  private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>();
098  private Publisher publisher;
099  private boolean connected = false;
100
101  /**
102   * We want to limit the size of the protobuf message sent, do fit into a single packet. a
103   * reasonable size for ip / ethernet is less than 1Kb.
104   */
105  public final static int MAX_SERVER_PER_MESSAGE = 10;
106
107  /**
108   * If a server dies, we're sending the information multiple times in case a receiver misses the
109   * message.
110   */
111  public final static int NB_SEND = 5;
112
113  public ClusterStatusPublisher(HMaster master, Configuration conf,
114    Class<? extends Publisher> publisherClass) throws IOException {
115    super("ClusterStatusPublisher for=" + master.getName(), master,
116      conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
117    this.master = master;
118    this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
119    try {
120      this.publisher = publisherClass.getDeclaredConstructor().newInstance();
121    } catch (Exception e) {
122      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
123    }
124    this.publisher.connect(conf);
125    connected = true;
126  }
127
128  @Override
129  public String toString() {
130    return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected;
131  }
132
133  // For tests only
134  protected ClusterStatusPublisher() {
135    master = null;
136    messagePeriod = 0;
137  }
138
139  @Override
140  protected void chore() {
141    if (!isConnected()) {
142      return;
143    }
144
145    List<ServerName> sns = generateDeadServersListToSend();
146    if (sns.isEmpty()) {
147      // Nothing to send. Done.
148      return;
149    }
150
151    final long curTime = EnvironmentEdgeManager.currentTime();
152    if (lastMessageTime > curTime - messagePeriod) {
153      // We already sent something less than 10 second ago. Done.
154      return;
155    }
156
157    // Ok, we're going to send something then.
158    lastMessageTime = curTime;
159
160    // We're reusing an existing protobuf message, but we don't send everything.
161    // This could be extended in the future, for example if we want to send stuff like the
162    // hbase:meta server name.
163    publisher.publish(ClusterMetricsBuilder.newBuilder().setHBaseVersion(VersionInfo.getVersion())
164      .setClusterId(master.getMasterFileSystem().getClusterId().toString())
165      .setMasterName(master.getServerName()).setDeadServerNames(sns).build());
166  }
167
168  @Override
169  protected synchronized void cleanup() {
170    connected = false;
171    publisher.close();
172  }
173
174  private synchronized boolean isConnected() {
175    return this.connected;
176  }
177
178  /**
179   * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
180   * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly dead
181   * first.
182   */
183  protected List<ServerName> generateDeadServersListToSend() {
184    // We're getting the message sent since last time, and add them to the list
185    long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
186    for (Pair<ServerName, Long> dead : getDeadServers(since)) {
187      lastSent.putIfAbsent(dead.getFirst(), 0);
188    }
189
190    // We're sending the new deads first.
191    List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet());
192    Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
193      @Override
194      public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
195        return o1.getValue().compareTo(o2.getValue());
196      }
197    });
198
199    // With a limit of MAX_SERVER_PER_MESSAGE
200    int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
201    List<ServerName> res = new ArrayList<>(max);
202
203    for (int i = 0; i < max; i++) {
204      Map.Entry<ServerName, Integer> toSend = entries.get(i);
205      if (toSend.getValue() >= (NB_SEND - 1)) {
206        lastSent.remove(toSend.getKey());
207      } else {
208        lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
209      }
210
211      res.add(toSend.getKey());
212    }
213
214    return res;
215  }
216
217  /**
218   * Get the servers which died since a given timestamp. protected because it can be subclassed by
219   * the tests.
220   */
221  protected List<Pair<ServerName, Long>> getDeadServers(long since) {
222    if (master.getServerManager() == null) {
223      return Collections.emptyList();
224    }
225
226    return master.getServerManager().getDeadServers().copyDeadServersSince(since);
227  }
228
229  public interface Publisher extends Closeable {
230
231    void connect(Configuration conf) throws IOException;
232
233    void publish(ClusterMetrics cs);
234
235    @Override
236    void close();
237  }
238
239  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
240  public static class MulticastPublisher implements Publisher {
241    private DatagramChannel channel;
242    private final EventLoopGroup group = new NioEventLoopGroup(1,
243      new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
244        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
245
246    public MulticastPublisher() {
247    }
248
249    @Override
250    public String toString() {
251      return "channel=" + this.channel;
252    }
253
254    @Override
255    public void connect(Configuration conf) throws IOException {
256      String mcAddress =
257        conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
258      int port =
259        conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT);
260      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
261        HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
262      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
263
264      final InetAddress ina;
265      try {
266        ina = InetAddress.getByName(mcAddress);
267      } catch (UnknownHostException e) {
268        close();
269        throw new IOException("Can't connect to " + mcAddress, e);
270      }
271      final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
272
273      InternetProtocolFamily family;
274      NetworkInterface ni;
275      if (niName != null) {
276        if (ina instanceof Inet6Address) {
277          family = InternetProtocolFamily.IPv6;
278        } else {
279          family = InternetProtocolFamily.IPv4;
280        }
281        ni = NetworkInterface.getByName(niName);
282      } else {
283        InetAddress localAddress;
284        if (ina instanceof Inet6Address) {
285          localAddress = Addressing.getIp6Address();
286          family = InternetProtocolFamily.IPv6;
287        } else {
288          localAddress = Addressing.getIp4Address();
289          family = InternetProtocolFamily.IPv4;
290        }
291        ni = NetworkInterface.getByInetAddress(localAddress);
292      }
293      Bootstrap b = new Bootstrap();
294      b.group(group)
295        .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
296        .option(ChannelOption.SO_REUSEADDR, true).handler(new ClusterMetricsEncoder(isa));
297      try {
298        LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
299        channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
300        channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
301        channel.connect(isa).sync();
302        // Set into configuration in case many networks available. Do this for tests so that
303        // server and client use same Interface (presuming share same Configuration).
304        // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks
305        // available with Master binding on one Interface and client on another so test failed.
306        if (ni != null) {
307          conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName());
308        }
309      } catch (InterruptedException e) {
310        close();
311        throw ExceptionUtil.asInterrupt(e);
312      }
313    }
314
315    private static final class HBaseDatagramChannelFactory<T extends Channel>
316      implements ChannelFactory<T> {
317      private final Class<? extends T> clazz;
318      private final InternetProtocolFamily family;
319
320      HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
321        this.clazz = clazz;
322        this.family = family;
323      }
324
325      @Override
326      public T newChannel() {
327        try {
328          return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
329            new Class[] { InternetProtocolFamily.class }, new Object[] { family });
330
331        } catch (Throwable t) {
332          throw new ChannelException("Unable to create Channel from class " + clazz, t);
333        }
334      }
335
336      @Override
337      public String toString() {
338        return StringUtil.simpleClassName(clazz) + ".class";
339      }
340    }
341
342    private static final class ClusterMetricsEncoder
343      extends MessageToMessageEncoder<ClusterMetrics> {
344      final private InetSocketAddress isa;
345
346      private ClusterMetricsEncoder(InetSocketAddress isa) {
347        this.isa = isa;
348      }
349
350      @Override
351      protected void encode(ChannelHandlerContext channelHandlerContext,
352        ClusterMetrics clusterStatus, List<Object> objects) {
353        objects.add(new DatagramPacket(Unpooled
354          .wrappedBuffer(ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa));
355      }
356    }
357
358    @Override
359    public void publish(ClusterMetrics cs) {
360      LOG.info("PUBLISH {}", cs);
361      channel.writeAndFlush(cs).syncUninterruptibly();
362    }
363
364    @Override
365    public void close() {
366      if (channel != null) {
367        channel.close();
368      }
369      group.shutdownGracefully();
370    }
371  }
372}