001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.hadoop.hbase.master;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.net.Inet6Address;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.net.NetworkInterface;
028import java.net.UnknownHostException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.Comparator;
032import java.util.List;
033import java.util.Map;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ClusterMetrics;
038import org.apache.hadoop.hbase.ClusterMetricsBuilder;
039import org.apache.hadoop.hbase.HBaseInterfaceAudience;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ScheduledChore;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.util.Addressing;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.ExceptionUtil;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.hadoop.hbase.util.Threads;
049import org.apache.hadoop.hbase.util.VersionInfo;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
053import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
054import org.apache.hbase.thirdparty.io.netty.channel.Channel;
055import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
056import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
059import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
060import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
061import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
063import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily;
064import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
065import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
066import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070
071/**
072 * Class to publish the cluster status to the client. This allows them to know immediately
073 *  the dead region servers, hence to cut the connection they have with them, eventually stop
074 *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
075 *  on the client the different timeouts, as the dead servers will be detected separately.
076 */
077@InterfaceAudience.Private
078public class ClusterStatusPublisher extends ScheduledChore {
079  private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class);
080  /**
081   * The implementation class used to publish the status. Default is null (no publish).
082   * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
083   * status.
084   */
085  public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
086  public static final Class<? extends ClusterStatusPublisher.Publisher>
087      DEFAULT_STATUS_PUBLISHER_CLASS =
088      org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
089
090  /**
091   * The minimum time between two status messages, in milliseconds.
092   */
093  public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
094  public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
095
096  private long lastMessageTime = 0;
097  private final HMaster master;
098  private final int messagePeriod; // time between two message
099  private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>();
100  private Publisher publisher;
101  private boolean connected = false;
102
103  /**
104   * We want to limit the size of the protobuf message sent, do fit into a single packet.
105   * a reasonable size for ip / ethernet is less than 1Kb.
106   */
107  public final static int MAX_SERVER_PER_MESSAGE = 10;
108
109  /**
110   * If a server dies, we're sending the information multiple times in case a receiver misses the
111   * message.
112   */
113  public final static int NB_SEND = 5;
114
115  public ClusterStatusPublisher(HMaster master, Configuration conf,
116                                Class<? extends Publisher> publisherClass)
117      throws IOException {
118    super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt(
119      STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
120    this.master = master;
121    this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
122    try {
123      this.publisher = publisherClass.getDeclaredConstructor().newInstance();
124    } catch (Exception e) {
125      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
126    }
127    this.publisher.connect(conf);
128    connected = true;
129  }
130
131  @Override
132  public String toString() {
133    return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected;
134  }
135
136  // For tests only
137  protected ClusterStatusPublisher() {
138    master = null;
139    messagePeriod = 0;
140  }
141
142  @Override
143  protected void chore() {
144    if (!isConnected()) {
145      return;
146    }
147
148    List<ServerName> sns = generateDeadServersListToSend();
149    if (sns.isEmpty()) {
150      // Nothing to send. Done.
151      return;
152    }
153
154    final long curTime = EnvironmentEdgeManager.currentTime();
155    if (lastMessageTime > curTime - messagePeriod) {
156      // We already sent something less than 10 second ago. Done.
157      return;
158    }
159
160    // Ok, we're going to send something then.
161    lastMessageTime = curTime;
162
163    // We're reusing an existing protobuf message, but we don't send everything.
164    // This could be extended in the future, for example if we want to send stuff like the
165    //  hbase:meta server name.
166    publisher.publish(ClusterMetricsBuilder.newBuilder()
167      .setHBaseVersion(VersionInfo.getVersion())
168      .setClusterId(master.getMasterFileSystem().getClusterId().toString())
169      .setMasterName(master.getServerName())
170      .setDeadServerNames(sns)
171      .build());
172  }
173
174  @Override
175  protected synchronized void cleanup() {
176    connected = false;
177    publisher.close();
178  }
179
180  private synchronized boolean isConnected() {
181    return this.connected;
182  }
183
184  /**
185   * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
186   * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
187   * dead first.
188   */
189  protected List<ServerName> generateDeadServersListToSend() {
190    // We're getting the message sent since last time, and add them to the list
191    long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
192    for (Pair<ServerName, Long> dead : getDeadServers(since)) {
193      lastSent.putIfAbsent(dead.getFirst(), 0);
194    }
195
196    // We're sending the new deads first.
197    List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet());
198    Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
199      @Override
200      public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
201        return o1.getValue().compareTo(o2.getValue());
202      }
203    });
204
205    // With a limit of MAX_SERVER_PER_MESSAGE
206    int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
207    List<ServerName> res = new ArrayList<>(max);
208
209    for (int i = 0; i < max; i++) {
210      Map.Entry<ServerName, Integer> toSend = entries.get(i);
211      if (toSend.getValue() >= (NB_SEND - 1)) {
212        lastSent.remove(toSend.getKey());
213      } else {
214        lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
215      }
216
217      res.add(toSend.getKey());
218    }
219
220    return res;
221  }
222
223  /**
224   * Get the servers which died since a given timestamp.
225   * protected because it can be subclassed by the tests.
226   */
227  protected List<Pair<ServerName, Long>> getDeadServers(long since) {
228    if (master.getServerManager() == null) {
229      return Collections.emptyList();
230    }
231
232    return master.getServerManager().getDeadServers().copyDeadServersSince(since);
233  }
234
235
236  public interface Publisher extends Closeable {
237
238    void connect(Configuration conf) throws IOException;
239
240    void publish(ClusterMetrics cs);
241
242    @Override
243    void close();
244  }
245
246  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
247  public static class MulticastPublisher implements Publisher {
248    private DatagramChannel channel;
249    private final EventLoopGroup group = new NioEventLoopGroup(1,
250      new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
251        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
252
253    public MulticastPublisher() {
254    }
255
256    @Override
257    public String toString() {
258      return "channel=" + this.channel;
259    }
260
261    @Override
262    public void connect(Configuration conf) throws IOException {
263      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
264          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
265      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
266          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
267      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
268        HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
269      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
270
271      final InetAddress ina;
272      try {
273        ina = InetAddress.getByName(mcAddress);
274      } catch (UnknownHostException e) {
275        close();
276        throw new IOException("Can't connect to " + mcAddress, e);
277      }
278      final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
279
280      InternetProtocolFamily family;
281      NetworkInterface ni;
282      if (niName != null) {
283        if (ina instanceof Inet6Address) {
284          family = InternetProtocolFamily.IPv6;
285        } else {
286          family = InternetProtocolFamily.IPv4;
287        }
288        ni = NetworkInterface.getByName(niName);
289      } else {
290        InetAddress localAddress;
291        if (ina instanceof Inet6Address) {
292          localAddress = Addressing.getIp6Address();
293          family = InternetProtocolFamily.IPv6;
294        } else {
295          localAddress = Addressing.getIp4Address();
296          family = InternetProtocolFamily.IPv4;
297        }
298        ni = NetworkInterface.getByInetAddress(localAddress);
299      }
300      Bootstrap b = new Bootstrap();
301      b.group(group)
302        .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
303        .option(ChannelOption.SO_REUSEADDR, true)
304        .handler(new ClusterMetricsEncoder(isa));
305      try {
306        LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
307        channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
308        channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
309        channel.connect(isa).sync();
310        // Set into configuration in case many networks available. Do this for tests so that
311        // server and client use same Interface (presuming share same Configuration).
312        // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks
313        // available with Master binding on one Interface and client on another so test failed.
314        if (ni != null) {
315          conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName());
316        }
317      } catch (InterruptedException e) {
318        close();
319        throw ExceptionUtil.asInterrupt(e);
320      }
321    }
322
323    private static final class HBaseDatagramChannelFactory<T extends Channel>
324        implements ChannelFactory<T> {
325      private final Class<? extends T> clazz;
326      private final InternetProtocolFamily family;
327
328      HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
329        this.clazz = clazz;
330        this.family = family;
331      }
332
333      @Override
334      public T newChannel() {
335        try {
336          return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
337            new Class[] { InternetProtocolFamily.class }, new Object[] { family });
338
339        } catch (Throwable t) {
340          throw new ChannelException("Unable to create Channel from class " + clazz, t);
341        }
342      }
343
344      @Override
345      public String toString() {
346        return StringUtil.simpleClassName(clazz) + ".class";
347      }
348    }
349
350    private static final class ClusterMetricsEncoder
351        extends MessageToMessageEncoder<ClusterMetrics> {
352      final private InetSocketAddress isa;
353
354      private ClusterMetricsEncoder(InetSocketAddress isa) {
355        this.isa = isa;
356      }
357
358      @Override
359      protected void encode(ChannelHandlerContext channelHandlerContext,
360        ClusterMetrics clusterStatus, List<Object> objects) {
361        objects.add(new DatagramPacket(Unpooled.wrappedBuffer(
362          ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa));
363      }
364    }
365
366    @Override
367    public void publish(ClusterMetrics cs) {
368      LOG.info("PUBLISH {}", cs);
369      channel.writeAndFlush(cs).syncUninterruptibly();
370    }
371
372    @Override
373    public void close() {
374      if (channel != null) {
375        channel.close();
376      }
377      group.shutdownGracefully();
378    }
379  }
380}