001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 021import org.apache.hbase.thirdparty.io.netty.channel.Channel; 022import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 023import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 024import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 025import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 026import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 027import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 028import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 029import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 030import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; 031import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; 032import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; 033import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 034 035import java.io.IOException; 036import java.io.InterruptedIOException; 037import java.net.InetSocketAddress; 038import java.util.List; 039import java.util.concurrent.CountDownLatch; 040 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.CellScanner; 043import org.apache.hadoop.hbase.HBaseInterfaceAudience; 044import org.apache.hadoop.hbase.Server; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 049import org.apache.hadoop.hbase.regionserver.HRegionServer; 050import org.apache.hadoop.hbase.security.HBasePolicyProvider; 051import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 052import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 053import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 054import org.apache.hbase.thirdparty.com.google.protobuf.Message; 055import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 058 059/** 060 * An RPC server with Netty4 implementation. 061 * @since 2.0.0 062 */ 063@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG}) 064public class NettyRpcServer extends RpcServer { 065 066 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 067 068 private final InetSocketAddress bindAddress; 069 070 private final CountDownLatch closed = new CountDownLatch(1); 071 private final Channel serverChannel; 072 private final ChannelGroup allChannels = 073 new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 074 075 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 076 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 077 boolean reservoirEnabled) throws IOException { 078 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 079 this.bindAddress = bindAddress; 080 EventLoopGroup eventLoopGroup; 081 Class<? extends ServerChannel> channelClass; 082 if (server instanceof HRegionServer) { 083 NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig(); 084 eventLoopGroup = config.group(); 085 channelClass = config.serverChannelClass(); 086 } else { 087 eventLoopGroup = new NioEventLoopGroup(0, 088 new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY)); 089 channelClass = NioServerSocketChannel.class; 090 } 091 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 092 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 093 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 094 .childHandler(new ChannelInitializer<Channel>() { 095 096 @Override 097 protected void initChannel(Channel ch) throws Exception { 098 ChannelPipeline pipeline = ch.pipeline(); 099 FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); 100 preambleDecoder.setSingleDecode(true); 101 pipeline.addLast("preambleDecoder", preambleDecoder); 102 pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); 103 pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); 104 pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); 105 pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); 106 } 107 }); 108 try { 109 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 110 LOG.info("Bind to {}", serverChannel.localAddress()); 111 } catch (InterruptedException e) { 112 throw new InterruptedIOException(e.getMessage()); 113 } 114 initReconfigurable(conf); 115 this.scheduler.init(new RpcSchedulerContext(this)); 116 } 117 118 @VisibleForTesting 119 protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { 120 return new NettyRpcServerPreambleHandler(NettyRpcServer.this); 121 } 122 123 @Override 124 public synchronized void start() { 125 if (started) { 126 return; 127 } 128 authTokenSecretMgr = createSecretManager(); 129 if (authTokenSecretMgr != null) { 130 setSecretManager(authTokenSecretMgr); 131 authTokenSecretMgr.start(); 132 } 133 this.authManager = new ServiceAuthorizationManager(); 134 HBasePolicyProvider.init(conf, authManager); 135 scheduler.start(); 136 started = true; 137 } 138 139 @Override 140 public synchronized void stop() { 141 if (!running) { 142 return; 143 } 144 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 145 if (authTokenSecretMgr != null) { 146 authTokenSecretMgr.stop(); 147 authTokenSecretMgr = null; 148 } 149 allChannels.close().awaitUninterruptibly(); 150 serverChannel.close(); 151 scheduler.stop(); 152 closed.countDown(); 153 running = false; 154 } 155 156 @Override 157 public synchronized void join() throws InterruptedException { 158 closed.await(); 159 } 160 161 @Override 162 public synchronized InetSocketAddress getListenerAddress() { 163 return ((InetSocketAddress) serverChannel.localAddress()); 164 } 165 166 @Override 167 public void setSocketSendBufSize(int size) { 168 } 169 170 @Override 171 public int getNumOpenConnections() { 172 int channelsCount = allChannels.size(); 173 // allChannels also contains the server channel, so exclude that from the count. 174 return channelsCount > 0 ? channelsCount - 1 : channelsCount; 175 } 176 177 @Override 178 public Pair<Message, CellScanner> call(BlockingService service, 179 MethodDescriptor md, Message param, CellScanner cellScanner, 180 long receiveTime, MonitoredRPCHandler status) throws IOException { 181 return call(service, md, param, cellScanner, receiveTime, status, 182 System.currentTimeMillis(), 0); 183 } 184 185 @Override 186 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 187 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, 188 long startTime, int timeout) throws IOException { 189 NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, 190 -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null); 191 return call(fakeCall, status); 192 } 193}