001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.InetSocketAddress;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.HBaseServerBase;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.regionserver.HRegionServer;
030import org.apache.hadoop.hbase.security.HBasePolicyProvider;
031import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
032import org.apache.hadoop.hbase.util.ReflectionUtils;
033import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
039import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
040import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
041import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
042import org.apache.hbase.thirdparty.io.netty.channel.Channel;
043import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
044import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
045import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
046import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
047import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
048import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
049import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
050import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
051import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
052import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
053import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
054import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
055
056/**
057 * An RPC server with Netty4 implementation.
058 * @since 2.0.0
059 */
060@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
061public class NettyRpcServer extends RpcServer {
062  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
063
064  /**
065   * Name of property to change netty rpc server eventloop thread count. Default is 0. Tests may set
066   * this down from unlimited.
067   */
068  public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY =
069    "hbase.netty.eventloop.rpcserver.thread.count";
070  private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
071
072  /**
073   * Name of property to change the byte buf allocator for the netty channels. Default is no value,
074   * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
075   * and "heap", or, the name of a class implementing ByteBufAllocator.
076   * <p>
077   * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
078   * controlled by platform specific code and documented system properties.
079   * <p>
080   * "heap" will prefer heap arena allocations.
081   */
082  public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
083  static final String POOLED_ALLOCATOR_TYPE = "pooled";
084  static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
085  static final String HEAP_ALLOCATOR_TYPE = "heap";
086
087  private final InetSocketAddress bindAddress;
088
089  private final CountDownLatch closed = new CountDownLatch(1);
090  private final Channel serverChannel;
091  private final ChannelGroup allChannels =
092    new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
093  private final ByteBufAllocator channelAllocator;
094
095  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
096    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
097    boolean reservoirEnabled) throws IOException {
098    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
099    this.bindAddress = bindAddress;
100    this.channelAllocator = getChannelAllocator(conf);
101    EventLoopGroup eventLoopGroup;
102    Class<? extends ServerChannel> channelClass;
103    if (server instanceof HRegionServer) {
104      NettyEventLoopGroupConfig config = ((HBaseServerBase) server).getEventLoopGroupConfig();
105      eventLoopGroup = config.group();
106      channelClass = config.serverChannelClass();
107    } else {
108      int threadCount = server == null
109        ? EVENTLOOP_THREADCOUNT_DEFAULT
110        : server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY,
111          EVENTLOOP_THREADCOUNT_DEFAULT);
112      eventLoopGroup = new NioEventLoopGroup(threadCount,
113        new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
114      channelClass = NioServerSocketChannel.class;
115    }
116    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
117      .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
118      .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
119      .childOption(ChannelOption.SO_REUSEADDR, true)
120      .childHandler(new ChannelInitializer<Channel>() {
121        @Override
122        protected void initChannel(Channel ch) throws Exception {
123          ch.config().setAllocator(channelAllocator);
124          ChannelPipeline pipeline = ch.pipeline();
125          FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
126          preambleDecoder.setSingleDecode(true);
127          pipeline.addLast("preambleDecoder", preambleDecoder);
128          pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler());
129          pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize));
130          pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
131          pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
132        }
133      });
134    try {
135      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
136      LOG.info("Bind to {}", serverChannel.localAddress());
137    } catch (InterruptedException e) {
138      throw new InterruptedIOException(e.getMessage());
139    }
140    initReconfigurable(conf);
141    this.scheduler.init(new RpcSchedulerContext(this));
142  }
143
144  private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
145    final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
146    if (value != null) {
147      if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
148        LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
149        return PooledByteBufAllocator.DEFAULT;
150      } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
151        LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
152        return UnpooledByteBufAllocator.DEFAULT;
153      } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
154        LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
155        return HeapByteBufAllocator.DEFAULT;
156      } else {
157        // If the value is none of the recognized labels, treat it as a class name. This allows the
158        // user to supply a custom implementation, perhaps for debugging.
159        try {
160          // ReflectionUtils throws UnsupportedOperationException if there are any problems.
161          ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
162          LOG.info("Using {} for buffer allocation", value);
163          return alloc;
164        } catch (ClassCastException | UnsupportedOperationException e) {
165          throw new IOException(e);
166        }
167      }
168    } else {
169      LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
170      return PooledByteBufAllocator.DEFAULT;
171    }
172  }
173
174  @InterfaceAudience.Private
175  protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
176    return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
177  }
178
179  @Override
180  public synchronized void start() {
181    if (started) {
182      return;
183    }
184    authTokenSecretMgr = createSecretManager();
185    if (authTokenSecretMgr != null) {
186      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
187      // LeaderElector start. See HBASE-25875
188      synchronized (authTokenSecretMgr) {
189        setSecretManager(authTokenSecretMgr);
190        authTokenSecretMgr.start();
191      }
192    }
193    this.authManager = new ServiceAuthorizationManager();
194    HBasePolicyProvider.init(conf, authManager);
195    scheduler.start();
196    started = true;
197  }
198
199  @Override
200  public synchronized void stop() {
201    if (!running) {
202      return;
203    }
204    LOG.info("Stopping server on " + this.serverChannel.localAddress());
205    if (authTokenSecretMgr != null) {
206      authTokenSecretMgr.stop();
207      authTokenSecretMgr = null;
208    }
209    allChannels.close().awaitUninterruptibly();
210    serverChannel.close();
211    scheduler.stop();
212    closed.countDown();
213    running = false;
214  }
215
216  @Override
217  public synchronized void join() throws InterruptedException {
218    closed.await();
219  }
220
221  @Override
222  public synchronized InetSocketAddress getListenerAddress() {
223    return ((InetSocketAddress) serverChannel.localAddress());
224  }
225
226  @Override
227  public void setSocketSendBufSize(int size) {
228  }
229
230  @Override
231  public int getNumOpenConnections() {
232    int channelsCount = allChannels.size();
233    // allChannels also contains the server channel, so exclude that from the count.
234    return channelsCount > 0 ? channelsCount - 1 : channelsCount;
235  }
236}