001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.InetSocketAddress;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.CellScanner;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.hadoop.hbase.security.HBasePolicyProvider;
032import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
039import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
040import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
041import org.apache.hbase.thirdparty.com.google.protobuf.Message;
042import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
043import org.apache.hbase.thirdparty.io.netty.channel.Channel;
044import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
045import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
046import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
047import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
048import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
049import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
050import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
051import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
052import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
053import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
054import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
055import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
056
057/**
058 * An RPC server with Netty4 implementation.
059 * @since 2.0.0
060 */
061@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
062public class NettyRpcServer extends RpcServer {
063  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
064
065  /**
066   * Name of property to change netty rpc server eventloop thread count. Default is 0.
067   * Tests may set this down from unlimited.
068   */
069  public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY =
070    "hbase.netty.eventloop.rpcserver.thread.count";
071  private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
072
073  private final InetSocketAddress bindAddress;
074
075  private final CountDownLatch closed = new CountDownLatch(1);
076  private final Channel serverChannel;
077  private final ChannelGroup allChannels =
078    new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
079
080  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
081      InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
082      boolean reservoirEnabled) throws IOException {
083    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
084    this.bindAddress = bindAddress;
085    EventLoopGroup eventLoopGroup;
086    Class<? extends ServerChannel> channelClass;
087    if (server instanceof HRegionServer) {
088      NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig();
089      eventLoopGroup = config.group();
090      channelClass = config.serverChannelClass();
091    } else {
092      int threadCount = server == null? EVENTLOOP_THREADCOUNT_DEFAULT:
093        server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY,
094          EVENTLOOP_THREADCOUNT_DEFAULT);
095      eventLoopGroup = new NioEventLoopGroup(threadCount,
096        new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
097      channelClass = NioServerSocketChannel.class;
098    }
099    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
100        .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
101        .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
102        .childOption(ChannelOption.SO_REUSEADDR, true)
103        .childHandler(new ChannelInitializer<Channel>() {
104
105          @Override
106          protected void initChannel(Channel ch) throws Exception {
107            ChannelPipeline pipeline = ch.pipeline();
108            FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
109            preambleDecoder.setSingleDecode(true);
110            pipeline.addLast("preambleDecoder", preambleDecoder);
111            pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler());
112            pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize));
113            pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
114            pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
115          }
116        });
117    try {
118      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
119      LOG.info("Bind to {}", serverChannel.localAddress());
120    } catch (InterruptedException e) {
121      throw new InterruptedIOException(e.getMessage());
122    }
123    initReconfigurable(conf);
124    this.scheduler.init(new RpcSchedulerContext(this));
125  }
126
127  @VisibleForTesting
128  protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
129    return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
130  }
131
132  @Override
133  public synchronized void start() {
134    if (started) {
135      return;
136    }
137    authTokenSecretMgr = createSecretManager();
138    if (authTokenSecretMgr != null) {
139      setSecretManager(authTokenSecretMgr);
140      authTokenSecretMgr.start();
141    }
142    this.authManager = new ServiceAuthorizationManager();
143    HBasePolicyProvider.init(conf, authManager);
144    scheduler.start();
145    started = true;
146  }
147
148  @Override
149  public synchronized void stop() {
150    if (!running) {
151      return;
152    }
153    LOG.info("Stopping server on " + this.serverChannel.localAddress());
154    if (authTokenSecretMgr != null) {
155      authTokenSecretMgr.stop();
156      authTokenSecretMgr = null;
157    }
158    allChannels.close().awaitUninterruptibly();
159    serverChannel.close();
160    scheduler.stop();
161    closed.countDown();
162    running = false;
163  }
164
165  @Override
166  public synchronized void join() throws InterruptedException {
167    closed.await();
168  }
169
170  @Override
171  public synchronized InetSocketAddress getListenerAddress() {
172    return ((InetSocketAddress) serverChannel.localAddress());
173  }
174
175  @Override
176  public void setSocketSendBufSize(int size) {
177  }
178
179  @Override
180  public int getNumOpenConnections() {
181    int channelsCount = allChannels.size();
182    // allChannels also contains the server channel, so exclude that from the count.
183    return channelsCount > 0 ? channelsCount - 1 : channelsCount;
184  }
185
186  @Override
187  public Pair<Message, CellScanner> call(BlockingService service,
188      MethodDescriptor md, Message param, CellScanner cellScanner,
189      long receiveTime, MonitoredRPCHandler status) throws IOException {
190    return call(service, md, param, cellScanner, receiveTime, status,
191        System.currentTimeMillis(), 0);
192  }
193
194  @Override
195  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
196      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
197      long startTime, int timeout) throws IOException {
198    NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
199        -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
200    return call(fakeCall, status);
201  }
202}