001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.SocketAddress;
022import java.util.Collections;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicReference;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.client.MetricsConnection;
029import org.apache.hadoop.hbase.exceptions.X509Exception;
030import org.apache.hadoop.hbase.io.FileChangeWatcher;
031import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.yetus.audience.InterfaceAudience;
034
035import org.apache.hbase.thirdparty.io.netty.channel.Channel;
036import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
037import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
038import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
039import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
040import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
041
042/**
043 * Netty client for the requests and responses.
044 * @since 2.0.0
045 */
046@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
047public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
048
049  final EventLoopGroup group;
050
051  final Class<? extends Channel> channelClass;
052
053  private final boolean shutdownGroupWhenClose;
054  private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
055  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
056  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
057
058  public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
059    MetricsConnection metrics) {
060    this(configuration, clusterId, localAddress, metrics, Collections.emptyMap());
061  }
062
063  public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
064    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
065    super(configuration, clusterId, localAddress, metrics, connectionAttributes);
066    Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass =
067      NettyRpcClientConfigHelper.getEventLoopConfig(conf);
068    if (groupAndChannelClass == null) {
069      // Use our own EventLoopGroup.
070      int threadCount =
071        conf.getInt(NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0);
072      this.group = new NioEventLoopGroup(threadCount,
073        new DefaultThreadFactory("RPCClient(own)-NioEventLoopGroup", true, Thread.NORM_PRIORITY));
074      this.channelClass = NioSocketChannel.class;
075      this.shutdownGroupWhenClose = true;
076    } else {
077      this.group = groupAndChannelClass.getFirst();
078      this.channelClass = groupAndChannelClass.getSecond();
079      this.shutdownGroupWhenClose = false;
080    }
081  }
082
083  /** Used in test only. */
084  public NettyRpcClient(Configuration configuration) {
085    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap());
086  }
087
088  @Override
089  protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException {
090    return new NettyRpcConnection(this, remoteId);
091  }
092
093  @Override
094  protected void closeInternal() {
095    if (shutdownGroupWhenClose) {
096      group.shutdownGracefully();
097    }
098    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
099    if (ks != null) {
100      ks.stop();
101    }
102    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
103    if (ts != null) {
104      ts.stop();
105    }
106  }
107
108  SslContext getSslContext() throws X509Exception, IOException {
109    SslContext result = sslContextForClient.get();
110    if (result == null) {
111      result = X509Util.createSslContextForClient(conf);
112      if (!sslContextForClient.compareAndSet(null, result)) {
113        // lost the race, another thread already set the value
114        result = sslContextForClient.get();
115      } else if (
116        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
117          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
118      ) {
119        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
120          () -> sslContextForClient.set(null));
121      }
122    }
123    return result;
124  }
125}