001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
021import org.apache.hbase.thirdparty.io.netty.channel.Channel;
022import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
023import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
024import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
025import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
026import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
027import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
028import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
029import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
030import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
031import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
032import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
033import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
034
035import java.io.IOException;
036import java.io.InterruptedIOException;
037import java.net.InetSocketAddress;
038import java.util.List;
039import java.util.concurrent.CountDownLatch;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.CellScanner;
043import org.apache.hadoop.hbase.HBaseInterfaceAudience;
044import org.apache.hadoop.hbase.Server;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.security.HBasePolicyProvider;
051import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
052import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
053import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
054import org.apache.hbase.thirdparty.com.google.protobuf.Message;
055import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
058
059/**
060 * An RPC server with Netty4 implementation.
061 * @since 2.0.0
062 */
063@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
064public class NettyRpcServer extends RpcServer {
065
066  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
067
068  private final InetSocketAddress bindAddress;
069
070  private final CountDownLatch closed = new CountDownLatch(1);
071  private final Channel serverChannel;
072  private final ChannelGroup allChannels =
073    new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
074
075  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
076      InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
077      boolean reservoirEnabled) throws IOException {
078    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
079    this.bindAddress = bindAddress;
080    EventLoopGroup eventLoopGroup;
081    Class<? extends ServerChannel> channelClass;
082    if (server instanceof HRegionServer) {
083      NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig();
084      eventLoopGroup = config.group();
085      channelClass = config.serverChannelClass();
086    } else {
087      eventLoopGroup = new NioEventLoopGroup(0,
088          new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
089      channelClass = NioServerSocketChannel.class;
090    }
091    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
092        .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
093        .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
094        .childHandler(new ChannelInitializer<Channel>() {
095
096          @Override
097          protected void initChannel(Channel ch) throws Exception {
098            ChannelPipeline pipeline = ch.pipeline();
099            FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
100            preambleDecoder.setSingleDecode(true);
101            pipeline.addLast("preambleDecoder", preambleDecoder);
102            pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler());
103            pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize));
104            pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
105            pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
106          }
107        });
108    try {
109      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
110      LOG.info("Bind to {}", serverChannel.localAddress());
111    } catch (InterruptedException e) {
112      throw new InterruptedIOException(e.getMessage());
113    }
114    initReconfigurable(conf);
115    this.scheduler.init(new RpcSchedulerContext(this));
116  }
117
118  @VisibleForTesting
119  protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
120    return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
121  }
122
123  @Override
124  public synchronized void start() {
125    if (started) {
126      return;
127    }
128    authTokenSecretMgr = createSecretManager();
129    if (authTokenSecretMgr != null) {
130      setSecretManager(authTokenSecretMgr);
131      authTokenSecretMgr.start();
132    }
133    this.authManager = new ServiceAuthorizationManager();
134    HBasePolicyProvider.init(conf, authManager);
135    scheduler.start();
136    started = true;
137  }
138
139  @Override
140  public synchronized void stop() {
141    if (!running) {
142      return;
143    }
144    LOG.info("Stopping server on " + this.serverChannel.localAddress());
145    if (authTokenSecretMgr != null) {
146      authTokenSecretMgr.stop();
147      authTokenSecretMgr = null;
148    }
149    allChannels.close().awaitUninterruptibly();
150    serverChannel.close();
151    scheduler.stop();
152    closed.countDown();
153    running = false;
154  }
155
156  @Override
157  public synchronized void join() throws InterruptedException {
158    closed.await();
159  }
160
161  @Override
162  public synchronized InetSocketAddress getListenerAddress() {
163    return ((InetSocketAddress) serverChannel.localAddress());
164  }
165
166  @Override
167  public void setSocketSendBufSize(int size) {
168  }
169
170  @Override
171  public int getNumOpenConnections() {
172    int channelsCount = allChannels.size();
173    // allChannels also contains the server channel, so exclude that from the count.
174    return channelsCount > 0 ? channelsCount - 1 : channelsCount;
175  }
176
177  @Override
178  public Pair<Message, CellScanner> call(BlockingService service,
179      MethodDescriptor md, Message param, CellScanner cellScanner,
180      long receiveTime, MonitoredRPCHandler status) throws IOException {
181    return call(service, md, param, cellScanner, receiveTime, status,
182        System.currentTimeMillis(), 0);
183  }
184
185  @Override
186  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
187      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
188      long startTime, int timeout) throws IOException {
189    NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
190        -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
191    return call(fakeCall, status);
192  }
193}