001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020
021package org.apache.hadoop.hbase.master;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.net.Inet6Address;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.NetworkInterface;
029import java.net.UnknownHostException;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.Comparator;
033import java.util.List;
034import java.util.Map;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClusterMetrics;
039import org.apache.hadoop.hbase.ClusterMetricsBuilder;
040import org.apache.hadoop.hbase.HBaseInterfaceAudience;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.util.Addressing;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.ExceptionUtil;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.hbase.util.VersionInfo;
051import org.apache.yetus.audience.InterfaceAudience;
052
053import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
054import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory;
055import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
056import org.apache.hbase.thirdparty.io.netty.channel.Channel;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
062import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
063import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
064import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamily;
065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
066import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
067import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
068
069
070/**
071 * Class to publish the cluster status to the client. This allows them to know immediately
072 *  the dead region servers, hence to cut the connection they have with them, eventually stop
073 *  waiting on the socket. This improves the mean time to recover, and as well allows to increase
074 *  on the client the different timeouts, as the dead servers will be detected separately.
075 */
076@InterfaceAudience.Private
077public class ClusterStatusPublisher extends ScheduledChore {
078  /**
079   * The implementation class used to publish the status. Default is null (no publish).
080   * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
081   * status.
082   */
083  public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
084  public static final Class<? extends ClusterStatusPublisher.Publisher>
085      DEFAULT_STATUS_PUBLISHER_CLASS =
086      org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
087
088  /**
089   * The minimum time between two status messages, in milliseconds.
090   */
091  public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
092  public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
093
094  private long lastMessageTime = 0;
095  private final HMaster master;
096  private final int messagePeriod; // time between two message
097  private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<>();
098  private Publisher publisher;
099  private boolean connected = false;
100
101  /**
102   * We want to limit the size of the protobuf message sent, do fit into a single packet.
103   * a reasonable size for ip / ethernet is less than 1Kb.
104   */
105  public final static int MAX_SERVER_PER_MESSAGE = 10;
106
107  /**
108   * If a server dies, we're sending the information multiple times in case a receiver misses the
109   * message.
110   */
111  public final static int NB_SEND = 5;
112
113  public ClusterStatusPublisher(HMaster master, Configuration conf,
114                                Class<? extends Publisher> publisherClass)
115      throws IOException {
116    super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
117      STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
118    this.master = master;
119    this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
120    try {
121      this.publisher = publisherClass.getDeclaredConstructor().newInstance();
122    } catch (Exception e) {
123      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
124    }
125    this.publisher.connect(conf);
126    connected = true;
127  }
128
129  // For tests only
130  protected ClusterStatusPublisher() {
131    master = null;
132    messagePeriod = 0;
133  }
134
135  @Override
136  protected void chore() {
137    if (!isConnected()) {
138      return;
139    }
140
141    List<ServerName> sns = generateDeadServersListToSend();
142    if (sns.isEmpty()) {
143      // Nothing to send. Done.
144      return;
145    }
146
147    final long curTime = EnvironmentEdgeManager.currentTime();
148    if (lastMessageTime > curTime - messagePeriod) {
149      // We already sent something less than 10 second ago. Done.
150      return;
151    }
152
153    // Ok, we're going to send something then.
154    lastMessageTime = curTime;
155
156    // We're reusing an existing protobuf message, but we don't send everything.
157    // This could be extended in the future, for example if we want to send stuff like the
158    //  hbase:meta server name.
159    publisher.publish(ClusterMetricsBuilder.newBuilder()
160      .setHBaseVersion(VersionInfo.getVersion())
161      .setClusterId(master.getMasterFileSystem().getClusterId().toString())
162      .setMasterName(master.getServerName())
163      .setDeadServerNames(sns)
164      .build());
165  }
166
167  @Override
168  protected synchronized void cleanup() {
169    connected = false;
170    publisher.close();
171  }
172
173  private synchronized boolean isConnected() {
174    return this.connected;
175  }
176
177  /**
178   * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
179   * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
180   * dead first.
181   */
182  protected List<ServerName> generateDeadServersListToSend() {
183    // We're getting the message sent since last time, and add them to the list
184    long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
185    for (Pair<ServerName, Long> dead : getDeadServers(since)) {
186      lastSent.putIfAbsent(dead.getFirst(), 0);
187    }
188
189    // We're sending the new deads first.
190    List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet());
191    Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
192      @Override
193      public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
194        return o1.getValue().compareTo(o2.getValue());
195      }
196    });
197
198    // With a limit of MAX_SERVER_PER_MESSAGE
199    int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
200    List<ServerName> res = new ArrayList<>(max);
201
202    for (int i = 0; i < max; i++) {
203      Map.Entry<ServerName, Integer> toSend = entries.get(i);
204      if (toSend.getValue() >= (NB_SEND - 1)) {
205        lastSent.remove(toSend.getKey());
206      } else {
207        lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
208      }
209
210      res.add(toSend.getKey());
211    }
212
213    return res;
214  }
215
216  /**
217   * Get the servers which died since a given timestamp.
218   * protected because it can be subclassed by the tests.
219   */
220  protected List<Pair<ServerName, Long>> getDeadServers(long since) {
221    if (master.getServerManager() == null) {
222      return Collections.emptyList();
223    }
224
225    return master.getServerManager().getDeadServers().copyDeadServersSince(since);
226  }
227
228
229  public interface Publisher extends Closeable {
230
231    void connect(Configuration conf) throws IOException;
232
233    void publish(ClusterMetrics cs);
234
235    @Override
236    void close();
237  }
238
239  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
240  public static class MulticastPublisher implements Publisher {
241    private DatagramChannel channel;
242    private final EventLoopGroup group = new NioEventLoopGroup(
243        1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
244
245    public MulticastPublisher() {
246    }
247
248    @Override
249    public void connect(Configuration conf) throws IOException {
250      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
251          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
252      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
253          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
254      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
255        HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
256      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
257
258      final InetAddress ina;
259      try {
260        ina = InetAddress.getByName(mcAddress);
261      } catch (UnknownHostException e) {
262        close();
263        throw new IOException("Can't connect to " + mcAddress, e);
264      }
265
266      final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
267
268      InternetProtocolFamily family;
269      NetworkInterface ni;
270      if (niName != null) {
271        if (ina instanceof Inet6Address) {
272          family = InternetProtocolFamily.IPv6;
273        } else {
274          family = InternetProtocolFamily.IPv4;
275        }
276        ni = NetworkInterface.getByName(niName);
277      } else {
278        InetAddress localAddress;
279        if (ina instanceof Inet6Address) {
280          localAddress = Addressing.getIp6Address();
281          family = InternetProtocolFamily.IPv6;
282        } else {
283          localAddress = Addressing.getIp4Address();
284          family = InternetProtocolFamily.IPv4;
285        }
286        ni = NetworkInterface.getByInetAddress(localAddress);
287      }
288
289      Bootstrap b = new Bootstrap();
290      b.group(group)
291        .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
292        .option(ChannelOption.SO_REUSEADDR, true)
293        .handler(new ClusterMetricsEncoder(isa));
294
295      try {
296        channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
297        channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
298        channel.connect(isa).sync();
299      } catch (InterruptedException e) {
300        close();
301        throw ExceptionUtil.asInterrupt(e);
302      }
303    }
304
305    private static final class HBaseDatagramChannelFactory<T extends Channel>
306      implements ChannelFactory<T> {
307      private final Class<? extends T> clazz;
308      private InternetProtocolFamily family;
309
310      HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
311        this.clazz = clazz;
312        this.family = family;
313      }
314
315      @Override
316      public T newChannel() {
317        try {
318          return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
319            new Class[] { InternetProtocolFamily.class }, new Object[] { family });
320
321        } catch (Throwable t) {
322          throw new ChannelException("Unable to create Channel from class " + clazz, t);
323        }
324      }
325
326      @Override
327      public String toString() {
328        return StringUtil.simpleClassName(clazz) + ".class";
329      }
330    }
331
332    private static final class ClusterMetricsEncoder
333        extends MessageToMessageEncoder<ClusterMetrics> {
334      final private InetSocketAddress isa;
335
336      private ClusterMetricsEncoder(InetSocketAddress isa) {
337        this.isa = isa;
338      }
339
340      @Override
341      protected void encode(ChannelHandlerContext channelHandlerContext,
342        ClusterMetrics clusterStatus, List<Object> objects) {
343        objects.add(new DatagramPacket(Unpooled.wrappedBuffer(
344          ClusterMetricsBuilder.toClusterStatus(clusterStatus).toByteArray()), isa));
345      }
346    }
347
348    @Override
349    public void publish(ClusterMetrics cs) {
350      channel.writeAndFlush(cs).syncUninterruptibly();
351    }
352
353    @Override
354    public void close() {
355      if (channel != null) {
356        channel.close();
357      }
358      group.shutdownGracefully();
359    }
360  }
361}