001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.hadoop.hbase.master;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.net.Inet6Address;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.net.NetworkInterface;
028import java.net.UnknownHostException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.Comparator;
032import java.util.List;
033import java.util.Map;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ClusterMetrics;
038import org.apache.hadoop.hbase.ClusterMetricsBuilder;
039import org.apache.hadoop.hbase.HBaseInterfaceAudience;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ScheduledChore;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.util.Addressing;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.ExceptionUtil;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.hadoop.hbase.util.Threads;
049import org.apache.hadoop.hbase.util.VersionInfo;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
052import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
053import org.apache.hbase.thirdparty.io.netty.channel.Channel;
054import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
055import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory;
056import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
058import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
059import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
060import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
061import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
062import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily;
063import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
064import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
065import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069
070/**
071 * Class to publish the cluster status to the client. This allows them to know immediately
072 *  the dead region servers, hence to cut the connection they have with them, eventually stop
073 *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
074 *  on the client the different timeouts, as the dead servers will be detected separately.
075 */
076@InterfaceAudience.Private
077public class ClusterStatusPublisher extends ScheduledChore {
078  private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class);
079  /**
080   * The implementation class used to publish the status. Default is null (no publish).
081   * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
082   * status.
083   */
084  public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
085  public static final Class<? extends ClusterStatusPublisher.Publisher>
086      DEFAULT_STATUS_PUBLISHER_CLASS =
087      org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
088
089  /**
090   * The minimum time between two status messages, in milliseconds.
091   */
092  public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
093  public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
094
095  private long lastMessageTime = 0;
096  private final HMaster master;
097  private final int messagePeriod; // time between two message
098  private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>();
099  private Publisher publisher;
100  private boolean connected = false;
101
102  /**
103   * We want to limit the size of the protobuf message sent, do fit into a single packet.
104   * a reasonable size for ip / ethernet is less than 1Kb.
105   */
106  public final static int MAX_SERVER_PER_MESSAGE = 10;
107
108  /**
109   * If a server dies, we're sending the information multiple times in case a receiver misses the
110   * message.
111   */
112  public final static int NB_SEND = 5;
113
114  public ClusterStatusPublisher(HMaster master, Configuration conf,
115                                Class<? extends Publisher> publisherClass)
116      throws IOException {
117    super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt(
118      STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
119    this.master = master;
120    this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
121    try {
122      this.publisher = publisherClass.getDeclaredConstructor().newInstance();
123    } catch (Exception e) {
124      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
125    }
126    this.publisher.connect(conf);
127    connected = true;
128  }
129
130  @Override
131  public String toString() {
132    return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected;
133  }
134
135  // For tests only
136  protected ClusterStatusPublisher() {
137    master = null;
138    messagePeriod = 0;
139  }
140
141  @Override
142  protected void chore() {
143    if (!isConnected()) {
144      return;
145    }
146
147    List<ServerName> sns = generateDeadServersListToSend();
148    if (sns.isEmpty()) {
149      // Nothing to send. Done.
150      return;
151    }
152
153    final long curTime = EnvironmentEdgeManager.currentTime();
154    if (lastMessageTime > curTime - messagePeriod) {
155      // We already sent something less than 10 second ago. Done.
156      return;
157    }
158
159    // Ok, we're going to send something then.
160    lastMessageTime = curTime;
161
162    // We're reusing an existing protobuf message, but we don't send everything.
163    // This could be extended in the future, for example if we want to send stuff like the
164    //  hbase:meta server name.
165    publisher.publish(ClusterMetricsBuilder.newBuilder()
166      .setHBaseVersion(VersionInfo.getVersion())
167      .setClusterId(master.getMasterFileSystem().getClusterId().toString())
168      .setMasterName(master.getServerName())
169      .setDeadServerNames(sns)
170      .build());
171  }
172
173  @Override
174  protected synchronized void cleanup() {
175    connected = false;
176    publisher.close();
177  }
178
179  private synchronized boolean isConnected() {
180    return this.connected;
181  }
182
183  /**
184   * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
185   * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
186   * dead first.
187   */
188  protected List<ServerName> generateDeadServersListToSend() {
189    // We're getting the message sent since last time, and add them to the list
190    long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
191    for (Pair<ServerName, Long> dead : getDeadServers(since)) {
192      lastSent.putIfAbsent(dead.getFirst(), 0);
193    }
194
195    // We're sending the new deads first.
196    List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet());
197    Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
198      @Override
199      public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
200        return o1.getValue().compareTo(o2.getValue());
201      }
202    });
203
204    // With a limit of MAX_SERVER_PER_MESSAGE
205    int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
206    List<ServerName> res = new ArrayList<>(max);
207
208    for (int i = 0; i < max; i++) {
209      Map.Entry<ServerName, Integer> toSend = entries.get(i);
210      if (toSend.getValue() >= (NB_SEND - 1)) {
211        lastSent.remove(toSend.getKey());
212      } else {
213        lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
214      }
215
216      res.add(toSend.getKey());
217    }
218
219    return res;
220  }
221
222  /**
223   * Get the servers which died since a given timestamp.
224   * protected because it can be subclassed by the tests.
225   */
226  protected List<Pair<ServerName, Long>> getDeadServers(long since) {
227    if (master.getServerManager() == null) {
228      return Collections.emptyList();
229    }
230
231    return master.getServerManager().getDeadServers().copyDeadServersSince(since);
232  }
233
234
235  public interface Publisher extends Closeable {
236
237    void connect(Configuration conf) throws IOException;
238
239    void publish(ClusterMetrics cs);
240
241    @Override
242    void close();
243  }
244
245  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
246  public static class MulticastPublisher implements Publisher {
247    private DatagramChannel channel;
248    private final EventLoopGroup group = new NioEventLoopGroup(
249        1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
250
251    public MulticastPublisher() {
252    }
253
254    @Override
255    public String toString() {
256      return "channel=" + this.channel;
257    }
258
259    @Override
260    public void connect(Configuration conf) throws IOException {
261      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
262          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
263      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
264          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
265      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
266        HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
267      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
268
269      final InetAddress ina;
270      try {
271        ina = InetAddress.getByName(mcAddress);
272      } catch (UnknownHostException e) {
273        close();
274        throw new IOException("Can't connect to " + mcAddress, e);
275      }
276      final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
277
278      InternetProtocolFamily family;
279      NetworkInterface ni;
280      if (niName != null) {
281        if (ina instanceof Inet6Address) {
282          family = InternetProtocolFamily.IPv6;
283        } else {
284          family = InternetProtocolFamily.IPv4;
285        }
286        ni = NetworkInterface.getByName(niName);
287      } else {
288        InetAddress localAddress;
289        if (ina instanceof Inet6Address) {
290          localAddress = Addressing.getIp6Address();
291          family = InternetProtocolFamily.IPv6;
292        } else {
293          localAddress = Addressing.getIp4Address();
294          family = InternetProtocolFamily.IPv4;
295        }
296        ni = NetworkInterface.getByInetAddress(localAddress);
297      }
298      Bootstrap b = new Bootstrap();
299      b.group(group)
300        .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
301        .option(ChannelOption.SO_REUSEADDR, true)
302        .handler(new ClusterMetricsEncoder(isa));
303      try {
304        LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
305        channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
306        channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
307        channel.connect(isa).sync();
308        // Set into configuration in case many networks available. Do this for tests so that
309        // server and client use same Interface (presuming share same Configuration).
310        // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks
311        // available with Master binding on one Interface and client on another so test failed.
312        if (ni != null) {
313          conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName());
314        }
315      } catch (InterruptedException e) {
316        close();
317        throw ExceptionUtil.asInterrupt(e);
318      }
319    }
320
321    private static final class HBaseDatagramChannelFactory<T extends Channel>
322        implements ChannelFactory<T> {
323      private final Class<? extends T> clazz;
324      private final InternetProtocolFamily family;
325
326      HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
327        this.clazz = clazz;
328        this.family = family;
329      }
330
331      @Override
332      public T newChannel() {
333        try {
334          return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
335            new Class[] { InternetProtocolFamily.class }, new Object[] { family });
336
337        } catch (Throwable t) {
338          throw new ChannelException("Unable to create Channel from class " + clazz, t);
339        }
340      }
341
342      @Override
343      public String toString() {
344        return StringUtil.simpleClassName(clazz) + ".class";
345      }
346    }
347
348    private static final class ClusterMetricsEncoder
349        extends MessageToMessageEncoder<ClusterMetrics> {
350      final private InetSocketAddress isa;
351
352      private ClusterMetricsEncoder(InetSocketAddress isa) {
353        this.isa = isa;
354      }
355
356      @Override
357      protected void encode(ChannelHandlerContext channelHandlerContext,
358        ClusterMetrics clusterStatus, List<Object> objects) {
359        objects.add(new DatagramPacket(Unpooled.wrappedBuffer(
360          ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa));
361      }
362    }
363
364    @Override
365    public void publish(ClusterMetrics cs) {
366      LOG.info("PUBLISH {}", cs);
367      channel.writeAndFlush(cs).syncUninterruptibly();
368    }
369
370    @Override
371    public void close() {
372      if (channel != null) {
373        channel.close();
374      }
375      group.shutdownGracefully();
376    }
377  }
378}