001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.InetSocketAddress; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.CellScanner; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.security.HBasePolicyProvider; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.ReflectionUtils; 036import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 046import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 047import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; 048import org.apache.hbase.thirdparty.io.netty.channel.Channel; 049import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 050import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 051import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 052import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 053import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 054import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 055import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 056import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; 057import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 058 059/** 060 * An RPC server with Netty4 implementation. 061 * @since 2.0.0 062 */ 063@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 064public class NettyRpcServer extends RpcServer { 065 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 066 067 /** 068 * Name of property to change the byte buf allocator for the netty channels. Default is no value, 069 * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", 070 * and "heap", or, the name of a class implementing ByteBufAllocator. 071 * <p> 072 * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is 073 * controlled by platform specific code and documented system properties. 074 * <p> 075 * "heap" will prefer heap arena allocations. 076 */ 077 public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; 078 static final String POOLED_ALLOCATOR_TYPE = "pooled"; 079 static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; 080 static final String HEAP_ALLOCATOR_TYPE = "heap"; 081 082 private final InetSocketAddress bindAddress; 083 084 private final CountDownLatch closed = new CountDownLatch(1); 085 private final Channel serverChannel; 086 final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 087 private final ByteBufAllocator channelAllocator; 088 089 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 090 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 091 boolean reservoirEnabled) throws IOException { 092 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 093 this.bindAddress = bindAddress; 094 this.channelAllocator = getChannelAllocator(conf); 095 // Get the event loop group configuration from the server class if available. 096 NettyEventLoopGroupConfig config = null; 097 if (server instanceof HRegionServer) { 098 config = ((HRegionServer) server).getEventLoopGroupConfig(); 099 } 100 if (config == null) { 101 config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer"); 102 } 103 EventLoopGroup eventLoopGroup = config.group(); 104 Class<? extends ServerChannel> channelClass = config.serverChannelClass(); 105 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 106 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 107 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 108 .childOption(ChannelOption.SO_REUSEADDR, true) 109 .childHandler(new ChannelInitializer<Channel>() { 110 @Override 111 protected void initChannel(Channel ch) throws Exception { 112 ch.config().setAllocator(channelAllocator); 113 ChannelPipeline pipeline = ch.pipeline(); 114 FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); 115 preambleDecoder.setSingleDecode(true); 116 pipeline.addLast("preambleDecoder", preambleDecoder); 117 pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); 118 pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); 119 pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); 120 pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); 121 } 122 }); 123 try { 124 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 125 LOG.info("Bind to {}", serverChannel.localAddress()); 126 } catch (InterruptedException e) { 127 throw new InterruptedIOException(e.getMessage()); 128 } 129 initReconfigurable(conf); 130 this.scheduler.init(new RpcSchedulerContext(this)); 131 } 132 133 private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { 134 final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); 135 if (value != null) { 136 if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 137 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 138 return PooledByteBufAllocator.DEFAULT; 139 } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 140 LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); 141 return UnpooledByteBufAllocator.DEFAULT; 142 } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 143 LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); 144 return HeapByteBufAllocator.DEFAULT; 145 } else { 146 // If the value is none of the recognized labels, treat it as a class name. This allows the 147 // user to supply a custom implementation, perhaps for debugging. 148 try { 149 // ReflectionUtils throws UnsupportedOperationException if there are any problems. 150 ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value); 151 LOG.info("Using {} for buffer allocation", value); 152 return alloc; 153 } catch (ClassCastException | UnsupportedOperationException e) { 154 throw new IOException(e); 155 } 156 } 157 } else { 158 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 159 return PooledByteBufAllocator.DEFAULT; 160 } 161 } 162 163 @InterfaceAudience.Private 164 protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { 165 return new NettyRpcServerPreambleHandler(NettyRpcServer.this); 166 } 167 168 @Override 169 public synchronized void start() { 170 if (started) { 171 return; 172 } 173 authTokenSecretMgr = createSecretManager(); 174 if (authTokenSecretMgr != null) { 175 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 176 // LeaderElector start. See HBASE-25875 177 synchronized (authTokenSecretMgr) { 178 setSecretManager(authTokenSecretMgr); 179 authTokenSecretMgr.start(); 180 } 181 } 182 this.authManager = new ServiceAuthorizationManager(); 183 HBasePolicyProvider.init(conf, authManager); 184 scheduler.start(); 185 started = true; 186 } 187 188 @Override 189 public synchronized void stop() { 190 if (!running) { 191 return; 192 } 193 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 194 if (authTokenSecretMgr != null) { 195 authTokenSecretMgr.stop(); 196 authTokenSecretMgr = null; 197 } 198 allChannels.close().awaitUninterruptibly(); 199 serverChannel.close(); 200 scheduler.stop(); 201 closed.countDown(); 202 running = false; 203 } 204 205 @Override 206 public synchronized void join() throws InterruptedException { 207 closed.await(); 208 } 209 210 @Override 211 public synchronized InetSocketAddress getListenerAddress() { 212 return ((InetSocketAddress) serverChannel.localAddress()); 213 } 214 215 @Override 216 public void setSocketSendBufSize(int size) { 217 } 218 219 @Override 220 public int getNumOpenConnections() { 221 int channelsCount = allChannels.size(); 222 // allChannels also contains the server channel, so exclude that from the count. 223 return channelsCount > 0 ? channelsCount - 1 : channelsCount; 224 } 225 226 @Override 227 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 228 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) 229 throws IOException { 230 return call(service, md, param, cellScanner, receiveTime, status, 231 EnvironmentEdgeManager.currentTime(), 0); 232 } 233 234 @Override 235 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 236 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, 237 long startTime, int timeout) throws IOException { 238 NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, 239 -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null); 240 return call(fakeCall, status); 241 } 242}