001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.SocketAddress;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseInterfaceAudience;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.client.MetricsConnection;
026import org.apache.hadoop.hbase.util.NettyFutureUtils;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.yetus.audience.InterfaceAudience;
029
030import org.apache.hbase.thirdparty.io.netty.channel.Channel;
031import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
032import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
033import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
034import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
035
036/**
037 * Netty client for the requests and responses.
038 * @since 2.0.0
039 */
040@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
041public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
042
043  final EventLoopGroup group;
044
045  final Class<? extends Channel> channelClass;
046
047  private final boolean shutdownGroupWhenClose;
048
049  public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
050    MetricsConnection metrics) {
051    super(configuration, clusterId, localAddress, metrics);
052    Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass =
053      NettyRpcClientConfigHelper.getEventLoopConfig(conf);
054    if (groupAndChannelClass == null) {
055      // Use our own EventLoopGroup.
056      int threadCount =
057        conf.getInt(NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0);
058      this.group = new NioEventLoopGroup(threadCount,
059        new DefaultThreadFactory("RPCClient(own)-NioEventLoopGroup", true, Thread.NORM_PRIORITY));
060      this.channelClass = NioSocketChannel.class;
061      this.shutdownGroupWhenClose = true;
062    } else {
063      this.group = groupAndChannelClass.getFirst();
064      this.channelClass = groupAndChannelClass.getSecond();
065      this.shutdownGroupWhenClose = false;
066    }
067  }
068
069  /** Used in test only. */
070  NettyRpcClient(Configuration configuration) {
071    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
072  }
073
074  @Override
075  protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException {
076    return new NettyRpcConnection(this, remoteId);
077  }
078
079  @Override
080  protected void closeInternal() {
081    if (shutdownGroupWhenClose) {
082      NettyFutureUtils.consume(group.shutdownGracefully());
083    }
084  }
085}