001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.InetSocketAddress; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.CellScanner; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.security.HBasePolicyProvider; 032import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 039import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 040import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 041import org.apache.hbase.thirdparty.com.google.protobuf.Message; 042import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 043import org.apache.hbase.thirdparty.io.netty.channel.Channel; 044import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 045import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 046import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 047import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 049import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 050import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 051import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 052import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; 053import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; 054import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; 055import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 056 057/** 058 * An RPC server with Netty4 implementation. 059 * @since 2.0.0 060 */ 061@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG}) 062public class NettyRpcServer extends RpcServer { 063 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 064 065 /** 066 * Name of property to change netty rpc server eventloop thread count. Default is 0. 067 * Tests may set this down from unlimited. 068 */ 069 public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY = 070 "hbase.netty.eventloop.rpcserver.thread.count"; 071 private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0; 072 073 private final InetSocketAddress bindAddress; 074 075 private final CountDownLatch closed = new CountDownLatch(1); 076 private final Channel serverChannel; 077 private final ChannelGroup allChannels = 078 new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 079 080 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 081 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 082 boolean reservoirEnabled) throws IOException { 083 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 084 this.bindAddress = bindAddress; 085 EventLoopGroup eventLoopGroup; 086 Class<? extends ServerChannel> channelClass; 087 if (server instanceof HRegionServer) { 088 NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig(); 089 eventLoopGroup = config.group(); 090 channelClass = config.serverChannelClass(); 091 } else { 092 int threadCount = server == null? EVENTLOOP_THREADCOUNT_DEFAULT: 093 server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY, 094 EVENTLOOP_THREADCOUNT_DEFAULT); 095 eventLoopGroup = new NioEventLoopGroup(threadCount, 096 new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY)); 097 channelClass = NioServerSocketChannel.class; 098 } 099 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 100 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 101 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 102 .childOption(ChannelOption.SO_REUSEADDR, true) 103 .childHandler(new ChannelInitializer<Channel>() { 104 105 @Override 106 protected void initChannel(Channel ch) throws Exception { 107 ChannelPipeline pipeline = ch.pipeline(); 108 FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); 109 preambleDecoder.setSingleDecode(true); 110 pipeline.addLast("preambleDecoder", preambleDecoder); 111 pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); 112 pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); 113 pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); 114 pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); 115 } 116 }); 117 try { 118 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 119 LOG.info("Bind to {}", serverChannel.localAddress()); 120 } catch (InterruptedException e) { 121 throw new InterruptedIOException(e.getMessage()); 122 } 123 initReconfigurable(conf); 124 this.scheduler.init(new RpcSchedulerContext(this)); 125 } 126 127 @VisibleForTesting 128 protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { 129 return new NettyRpcServerPreambleHandler(NettyRpcServer.this); 130 } 131 132 @Override 133 public synchronized void start() { 134 if (started) { 135 return; 136 } 137 authTokenSecretMgr = createSecretManager(); 138 if (authTokenSecretMgr != null) { 139 setSecretManager(authTokenSecretMgr); 140 authTokenSecretMgr.start(); 141 } 142 this.authManager = new ServiceAuthorizationManager(); 143 HBasePolicyProvider.init(conf, authManager); 144 scheduler.start(); 145 started = true; 146 } 147 148 @Override 149 public synchronized void stop() { 150 if (!running) { 151 return; 152 } 153 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 154 if (authTokenSecretMgr != null) { 155 authTokenSecretMgr.stop(); 156 authTokenSecretMgr = null; 157 } 158 allChannels.close().awaitUninterruptibly(); 159 serverChannel.close(); 160 scheduler.stop(); 161 closed.countDown(); 162 running = false; 163 } 164 165 @Override 166 public synchronized void join() throws InterruptedException { 167 closed.await(); 168 } 169 170 @Override 171 public synchronized InetSocketAddress getListenerAddress() { 172 return ((InetSocketAddress) serverChannel.localAddress()); 173 } 174 175 @Override 176 public void setSocketSendBufSize(int size) { 177 } 178 179 @Override 180 public int getNumOpenConnections() { 181 int channelsCount = allChannels.size(); 182 // allChannels also contains the server channel, so exclude that from the count. 183 return channelsCount > 0 ? channelsCount - 1 : channelsCount; 184 } 185 186 @Override 187 public Pair<Message, CellScanner> call(BlockingService service, 188 MethodDescriptor md, Message param, CellScanner cellScanner, 189 long receiveTime, MonitoredRPCHandler status) throws IOException { 190 return call(service, md, param, cellScanner, receiveTime, status, 191 System.currentTimeMillis(), 0); 192 } 193 194 @Override 195 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 196 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, 197 long startTime, int timeout) throws IOException { 198 NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, 199 -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null); 200 return call(fakeCall, status); 201 } 202}