001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020
021package org.apache.hadoop.hbase.master;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.net.Inet6Address;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.NetworkInterface;
029import java.net.UnknownHostException;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.Comparator;
033import java.util.List;
034import java.util.Map;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClusterMetrics;
039import org.apache.hadoop.hbase.ClusterMetricsBuilder;
040import org.apache.hadoop.hbase.HBaseInterfaceAudience;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.util.Addressing;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.ExceptionUtil;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.hbase.util.VersionInfo;
051import org.apache.yetus.audience.InterfaceAudience;
052
053import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
054import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory;
055import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
056import org.apache.hbase.thirdparty.io.netty.channel.Channel;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
063import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
064import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily;
065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
066import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
067import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
068
069
070/**
071 * Class to publish the cluster status to the client. This allows them to know immediately
072 *  the dead region servers, hence to cut the connection they have with them, eventually stop
073 *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
074 *  on the client the different timeouts, as the dead servers will be detected separately.
075 */
076@InterfaceAudience.Private
077public class ClusterStatusPublisher extends ScheduledChore {
078  /**
079   * The implementation class used to publish the status. Default is null (no publish).
080   * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
081   * status.
082   */
083  public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
084  public static final Class<? extends ClusterStatusPublisher.Publisher>
085      DEFAULT_STATUS_PUBLISHER_CLASS =
086      org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
087
088  /**
089   * The minimum time between two status messages, in milliseconds.
090   */
091  public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
092  public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
093
094  private long lastMessageTime = 0;
095  private final HMaster master;
096  private final int messagePeriod; // time between two message
097  private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>();
098  private Publisher publisher;
099  private boolean connected = false;
100
101  /**
102   * We want to limit the size of the protobuf message sent, do fit into a single packet.
103   * a reasonable size for ip / ethernet is less than 1Kb.
104   */
105  public final static int MAX_SERVER_PER_MESSAGE = 10;
106
107  /**
108   * If a server dies, we're sending the information multiple times in case a receiver misses the
109   * message.
110   */
111  public final static int NB_SEND = 5;
112
113  public ClusterStatusPublisher(HMaster master, Configuration conf,
114                                Class<? extends Publisher> publisherClass)
115      throws IOException {
116    super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
117      STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
118    this.master = master;
119    this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
120    try {
121      this.publisher = publisherClass.getDeclaredConstructor().newInstance();
122    } catch (Exception e) {
123      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
124    }
125    this.publisher.connect(conf);
126    connected = true;
127  }
128
129  // For tests only
130  protected ClusterStatusPublisher() {
131    master = null;
132    messagePeriod = 0;
133  }
134
135  @Override
136  protected void chore() {
137    if (!isConnected()) {
138      return;
139    }
140
141    List<ServerName> sns = generateDeadServersListToSend();
142    if (sns.isEmpty()) {
143      // Nothing to send. Done.
144      return;
145    }
146
147    final long curTime = EnvironmentEdgeManager.currentTime();
148    if (lastMessageTime > curTime - messagePeriod) {
149      // We already sent something less than 10 second ago. Done.
150      return;
151    }
152
153    // Ok, we're going to send something then.
154    lastMessageTime = curTime;
155
156    // We're reusing an existing protobuf message, but we don't send everything.
157    // This could be extended in the future, for example if we want to send stuff like the
158    //  hbase:meta server name.
159    publisher.publish(ClusterMetricsBuilder.newBuilder()
160      .setHBaseVersion(VersionInfo.getVersion())
161      .setClusterId(master.getMasterFileSystem().getClusterId().toString())
162      .setMasterName(master.getServerName())
163      .setDeadServerNames(sns)
164      .build());
165  }
166
167  @Override
168  protected synchronized void cleanup() {
169    connected = false;
170    publisher.close();
171  }
172
173  private synchronized boolean isConnected() {
174    return this.connected;
175  }
176
177  /**
178   * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
179   * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
180   * dead first.
181   */
182  protected List<ServerName> generateDeadServersListToSend() {
183    // We're getting the message sent since last time, and add them to the list
184    long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
185    for (Pair<ServerName, Long> dead : getDeadServers(since)) {
186      lastSent.putIfAbsent(dead.getFirst(), 0);
187    }
188
189    // We're sending the new deads first.
190    List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>();
191    entries.addAll(lastSent.entrySet());
192    Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
193      @Override
194      public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
195        return o1.getValue().compareTo(o2.getValue());
196      }
197    });
198
199    // With a limit of MAX_SERVER_PER_MESSAGE
200    int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
201    List<ServerName> res = new ArrayList<>(max);
202
203    for (int i = 0; i < max; i++) {
204      Map.Entry<ServerName, Integer> toSend = entries.get(i);
205      if (toSend.getValue() >= (NB_SEND - 1)) {
206        lastSent.remove(toSend.getKey());
207      } else {
208        lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
209      }
210
211      res.add(toSend.getKey());
212    }
213
214    return res;
215  }
216
217  /**
218   * Get the servers which died since a given timestamp.
219   * protected because it can be subclassed by the tests.
220   */
221  protected List<Pair<ServerName, Long>> getDeadServers(long since) {
222    if (master.getServerManager() == null) {
223      return Collections.emptyList();
224    }
225
226    return master.getServerManager().getDeadServers().copyDeadServersSince(since);
227  }
228
229
230  public interface Publisher extends Closeable {
231
232    void connect(Configuration conf) throws IOException;
233
234    void publish(ClusterMetrics cs);
235
236    @Override
237    void close();
238  }
239
240  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
241  public static class MulticastPublisher implements Publisher {
242    private DatagramChannel channel;
243    private final EventLoopGroup group = new NioEventLoopGroup(
244        1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
245
246    public MulticastPublisher() {
247    }
248
249    @Override
250    public void connect(Configuration conf) throws IOException {
251      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
252          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
253      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
254          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
255      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
256        HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
257      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
258
259      final InetAddress ina;
260      try {
261        ina = InetAddress.getByName(mcAddress);
262      } catch (UnknownHostException e) {
263        close();
264        throw new IOException("Can't connect to " + mcAddress, e);
265      }
266
267      final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
268
269      InternetProtocolFamily family;
270      NetworkInterface ni;
271      if (niName != null) {
272        if (ina instanceof Inet6Address) {
273          family = InternetProtocolFamily.IPv6;
274        } else {
275          family = InternetProtocolFamily.IPv4;
276        }
277        ni = NetworkInterface.getByName(niName);
278      } else {
279        InetAddress localAddress;
280        if (ina instanceof Inet6Address) {
281          localAddress = Addressing.getIp6Address();
282          family = InternetProtocolFamily.IPv6;
283        } else {
284          localAddress = Addressing.getIp4Address();
285          family = InternetProtocolFamily.IPv4;
286        }
287        ni = NetworkInterface.getByInetAddress(localAddress);
288      }
289
290      Bootstrap b = new Bootstrap();
291      b.group(group)
292        .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
293        .option(ChannelOption.SO_REUSEADDR, true)
294        .handler(new ClusterMetricsEncoder(isa));
295
296      try {
297        channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
298        channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
299        channel.connect(isa).sync();
300      } catch (InterruptedException e) {
301        close();
302        throw ExceptionUtil.asInterrupt(e);
303      }
304    }
305
306    private static final class HBaseDatagramChannelFactory<T extends Channel>
307      implements ChannelFactory<T> {
308      private final Class<? extends T> clazz;
309      private InternetProtocolFamily family;
310
311      HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
312        this.clazz = clazz;
313        this.family = family;
314      }
315
316      @Override
317      public T newChannel() {
318        try {
319          return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
320            new Class[] { InternetProtocolFamily.class }, new Object[] { family });
321
322        } catch (Throwable t) {
323          throw new ChannelException("Unable to create Channel from class " + clazz, t);
324        }
325      }
326
327      @Override
328      public String toString() {
329        return StringUtil.simpleClassName(clazz) + ".class";
330      }
331    }
332
333    private static final class ClusterMetricsEncoder
334        extends MessageToMessageEncoder<ClusterMetrics> {
335      final private InetSocketAddress isa;
336
337      private ClusterMetricsEncoder(InetSocketAddress isa) {
338        this.isa = isa;
339      }
340
341      @Override
342      protected void encode(ChannelHandlerContext channelHandlerContext,
343        ClusterMetrics clusterStatus, List<Object> objects) {
344        objects.add(new DatagramPacket(Unpooled.wrappedBuffer(
345          ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa));
346      }
347    }
348
349    @Override
350    public void publish(ClusterMetrics cs) {
351      channel.writeAndFlush(cs).syncUninterruptibly();
352    }
353
354    @Override
355    public void close() {
356      if (channel != null) {
357        channel.close();
358      }
359      group.shutdownGracefully();
360    }
361  }
362}