001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.InetSocketAddress; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.HBaseServerBase; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.regionserver.HRegionServer; 030import org.apache.hadoop.hbase.security.HBasePolicyProvider; 031import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 032import org.apache.hadoop.hbase.util.ReflectionUtils; 033import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 039import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 040import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 041import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; 042import org.apache.hbase.thirdparty.io.netty.channel.Channel; 043import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 044import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 045import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 046import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 047import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 048import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 049import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 050import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 051import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; 052import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; 053import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; 054import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 055 056/** 057 * An RPC server with Netty4 implementation. 058 * @since 2.0.0 059 */ 060@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 061public class NettyRpcServer extends RpcServer { 062 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 063 064 /** 065 * Name of property to change netty rpc server eventloop thread count. Default is 0. Tests may set 066 * this down from unlimited. 067 */ 068 public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY = 069 "hbase.netty.eventloop.rpcserver.thread.count"; 070 private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0; 071 072 /** 073 * Name of property to change the byte buf allocator for the netty channels. Default is no value, 074 * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", 075 * and "heap", or, the name of a class implementing ByteBufAllocator. 076 * <p> 077 * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is 078 * controlled by platform specific code and documented system properties. 079 * <p> 080 * "heap" will prefer heap arena allocations. 081 */ 082 public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; 083 static final String POOLED_ALLOCATOR_TYPE = "pooled"; 084 static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; 085 static final String HEAP_ALLOCATOR_TYPE = "heap"; 086 087 private final InetSocketAddress bindAddress; 088 089 private final CountDownLatch closed = new CountDownLatch(1); 090 private final Channel serverChannel; 091 private final ChannelGroup allChannels = 092 new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 093 private final ByteBufAllocator channelAllocator; 094 095 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 096 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 097 boolean reservoirEnabled) throws IOException { 098 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 099 this.bindAddress = bindAddress; 100 this.channelAllocator = getChannelAllocator(conf); 101 EventLoopGroup eventLoopGroup; 102 Class<? extends ServerChannel> channelClass; 103 if (server instanceof HRegionServer) { 104 NettyEventLoopGroupConfig config = ((HBaseServerBase) server).getEventLoopGroupConfig(); 105 eventLoopGroup = config.group(); 106 channelClass = config.serverChannelClass(); 107 } else { 108 int threadCount = server == null 109 ? EVENTLOOP_THREADCOUNT_DEFAULT 110 : server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY, 111 EVENTLOOP_THREADCOUNT_DEFAULT); 112 eventLoopGroup = new NioEventLoopGroup(threadCount, 113 new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY)); 114 channelClass = NioServerSocketChannel.class; 115 } 116 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 117 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 118 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 119 .childOption(ChannelOption.SO_REUSEADDR, true) 120 .childHandler(new ChannelInitializer<Channel>() { 121 @Override 122 protected void initChannel(Channel ch) throws Exception { 123 ch.config().setAllocator(channelAllocator); 124 ChannelPipeline pipeline = ch.pipeline(); 125 FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); 126 preambleDecoder.setSingleDecode(true); 127 pipeline.addLast("preambleDecoder", preambleDecoder); 128 pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); 129 pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); 130 pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); 131 pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); 132 } 133 }); 134 try { 135 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 136 LOG.info("Bind to {}", serverChannel.localAddress()); 137 } catch (InterruptedException e) { 138 throw new InterruptedIOException(e.getMessage()); 139 } 140 initReconfigurable(conf); 141 this.scheduler.init(new RpcSchedulerContext(this)); 142 } 143 144 private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { 145 final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); 146 if (value != null) { 147 if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 148 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 149 return PooledByteBufAllocator.DEFAULT; 150 } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 151 LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); 152 return UnpooledByteBufAllocator.DEFAULT; 153 } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 154 LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); 155 return HeapByteBufAllocator.DEFAULT; 156 } else { 157 // If the value is none of the recognized labels, treat it as a class name. This allows the 158 // user to supply a custom implementation, perhaps for debugging. 159 try { 160 // ReflectionUtils throws UnsupportedOperationException if there are any problems. 161 ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value); 162 LOG.info("Using {} for buffer allocation", value); 163 return alloc; 164 } catch (ClassCastException | UnsupportedOperationException e) { 165 throw new IOException(e); 166 } 167 } 168 } else { 169 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 170 return PooledByteBufAllocator.DEFAULT; 171 } 172 } 173 174 @InterfaceAudience.Private 175 protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { 176 return new NettyRpcServerPreambleHandler(NettyRpcServer.this); 177 } 178 179 @Override 180 public synchronized void start() { 181 if (started) { 182 return; 183 } 184 authTokenSecretMgr = createSecretManager(); 185 if (authTokenSecretMgr != null) { 186 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 187 // LeaderElector start. See HBASE-25875 188 synchronized (authTokenSecretMgr) { 189 setSecretManager(authTokenSecretMgr); 190 authTokenSecretMgr.start(); 191 } 192 } 193 this.authManager = new ServiceAuthorizationManager(); 194 HBasePolicyProvider.init(conf, authManager); 195 scheduler.start(); 196 started = true; 197 } 198 199 @Override 200 public synchronized void stop() { 201 if (!running) { 202 return; 203 } 204 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 205 if (authTokenSecretMgr != null) { 206 authTokenSecretMgr.stop(); 207 authTokenSecretMgr = null; 208 } 209 allChannels.close().awaitUninterruptibly(); 210 serverChannel.close(); 211 scheduler.stop(); 212 closed.countDown(); 213 running = false; 214 } 215 216 @Override 217 public synchronized void join() throws InterruptedException { 218 closed.await(); 219 } 220 221 @Override 222 public synchronized InetSocketAddress getListenerAddress() { 223 return ((InetSocketAddress) serverChannel.localAddress()); 224 } 225 226 @Override 227 public void setSocketSendBufSize(int size) { 228 } 229 230 @Override 231 public int getNumOpenConnections() { 232 int channelsCount = allChannels.size(); 233 // allChannels also contains the server channel, so exclude that from the count. 234 return channelsCount > 0 ? channelsCount - 1 : channelsCount; 235 } 236}