001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE; 021import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED; 022import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT; 023import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_WRAP_SIZE; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.net.InetSocketAddress; 028import java.net.SocketAddress; 029import java.security.cert.Certificate; 030import java.security.cert.X509Certificate; 031import java.util.List; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.atomic.AtomicReference; 034import javax.net.ssl.SSLPeerUnverifiedException; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HBaseServerBase; 038import org.apache.hadoop.hbase.Server; 039import org.apache.hadoop.hbase.exceptions.X509Exception; 040import org.apache.hadoop.hbase.io.FileChangeWatcher; 041import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 042import org.apache.hadoop.hbase.security.HBasePolicyProvider; 043import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 044import org.apache.hadoop.hbase.util.NettyUnsafeUtils; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.util.ReflectionUtils; 047import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 053import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 054import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 055import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; 056import org.apache.hbase.thirdparty.io.netty.channel.Channel; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 061import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 062import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; 063import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 064import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 065import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler; 066import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; 067import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; 068import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 069 070/** 071 * An RPC server with Netty4 implementation. 072 * @since 2.0.0 073 */ 074@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 075public class NettyRpcServer extends RpcServer { 076 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 077 078 /** 079 * Name of property to change the byte buf allocator for the netty channels. Default is no value, 080 * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", 081 * and "heap", or, the name of a class implementing ByteBufAllocator. 082 * <p> 083 * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is 084 * controlled by platform specific code and documented system properties. 085 * <p> 086 * "heap" will prefer heap arena allocations. 087 */ 088 public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; 089 static final String POOLED_ALLOCATOR_TYPE = "pooled"; 090 static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; 091 static final String HEAP_ALLOCATOR_TYPE = "heap"; 092 093 /** 094 * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was 095 * exceeded, channel will have setAutoRead to true again. The server will start reading incoming 096 * bytes (requests) from the client channel. 097 */ 098 public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY = 099 "hbase.server.netty.writable.watermark.low"; 100 private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0; 101 102 /** 103 * High watermark for pending outbound bytes of a single netty channel. If the number of pending 104 * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server 105 * will stop reading incoming requests from the client channel. 106 * <p> 107 * Note: any requests already in the call queue will still be processed. 108 */ 109 public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY = 110 "hbase.server.netty.writable.watermark.high"; 111 private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0; 112 113 /** 114 * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending 115 * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory 116 * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight 117 * requests. 118 * <p> 119 * Note: must be higher than the high watermark, otherwise it's ignored. 120 */ 121 public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY = 122 "hbase.server.netty.writable.watermark.fatal"; 123 private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0; 124 125 private final InetSocketAddress bindAddress; 126 127 private final CountDownLatch closed = new CountDownLatch(1); 128 private final Channel serverChannel; 129 final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 130 private final ByteBufAllocator channelAllocator; 131 private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>(); 132 private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>(); 133 private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>(); 134 135 private volatile int writeBufferFatalThreshold; 136 private volatile WriteBufferWaterMark writeBufferWaterMark; 137 138 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 139 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 140 boolean reservoirEnabled) throws IOException { 141 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 142 this.bindAddress = bindAddress; 143 this.channelAllocator = getChannelAllocator(conf); 144 // Get the event loop group configuration from the server class if available. 145 NettyEventLoopGroupConfig config = null; 146 if (server instanceof HBaseServerBase) { 147 config = ((HBaseServerBase<?>) server).getEventLoopGroupConfig(); 148 } 149 if (config == null) { 150 config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer"); 151 } 152 153 // call before creating bootstrap below so that the necessary configs can be set 154 configureNettyWatermarks(conf); 155 156 EventLoopGroup eventLoopGroup = config.group(); 157 Class<? extends ServerChannel> channelClass = config.serverChannelClass(); 158 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 159 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 160 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 161 .childOption(ChannelOption.SO_REUSEADDR, true) 162 .childHandler(new ChannelInitializer<Channel>() { 163 @Override 164 protected void initChannel(Channel ch) throws Exception { 165 ch.config().setWriteBufferWaterMark(writeBufferWaterMark); 166 ch.config().setAllocator(channelAllocator); 167 ChannelPipeline pipeline = ch.pipeline(); 168 169 NettyServerRpcConnection conn = createNettyServerRpcConnection(ch); 170 171 if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) { 172 initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true)); 173 } 174 pipeline 175 .addLast(NettyRpcServerPreambleHandler.DECODER_NAME, 176 NettyRpcServerPreambleHandler.createDecoder()) 177 .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn)) 178 // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may 179 // send RpcResponse to client. 180 .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics)) 181 // Add writability handler after the response encoder, so we can abort writes before 182 // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so 183 // that the handler configs can be live updated via update_config. 184 .addLast(NettyRpcServerChannelWritabilityHandler.NAME, 185 new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold, 186 () -> isWritabilityBackpressureEnabled())); 187 } 188 }); 189 try { 190 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 191 LOG.info("Bind to {}", serverChannel.localAddress()); 192 } catch (InterruptedException e) { 193 throw new InterruptedIOException(e.getMessage()); 194 } 195 initReconfigurable(conf); 196 this.scheduler.init(new RpcSchedulerContext(this)); 197 } 198 199 @Override 200 public void onConfigurationChange(Configuration newConf) { 201 super.onConfigurationChange(newConf); 202 configureNettyWatermarks(newConf); 203 } 204 205 private void configureNettyWatermarks(Configuration conf) { 206 int watermarkLow = 207 conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT); 208 int watermarkHigh = 209 conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT); 210 int fatalThreshold = 211 conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT); 212 213 WriteBufferWaterMark oldWaterMark = writeBufferWaterMark; 214 int oldFatalThreshold = writeBufferFatalThreshold; 215 216 boolean disabled = false; 217 if (watermarkHigh == 0 && watermarkLow == 0) { 218 // if both are 0, use the netty default, which we will treat as "disabled". 219 // when disabled, we won't manage autoRead in response to writability changes. 220 writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; 221 disabled = true; 222 } else { 223 // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to 224 // 1 to avoid confusing behavior. 225 if (watermarkLow == 0) { 226 LOG.warn( 227 "Detected a {} value of 0, which is impossible to achieve " 228 + "due to how netty evaluates these thresholds, setting to 1", 229 CHANNEL_WRITABLE_LOW_WATERMARK_KEY); 230 watermarkLow = 1; 231 } 232 233 // netty validates the watermarks and throws an exception if high < low, fail more gracefully 234 // by disabling the watermarks and warning. 235 if (watermarkHigh <= watermarkLow) { 236 LOG.warn( 237 "Detected {} value {}, lower than {} value {}. This will fail netty validation, " 238 + "so disabling", 239 CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 240 watermarkLow); 241 writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; 242 } else { 243 writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh); 244 } 245 246 // only apply this check when watermark is enabled. this way we give the operator some 247 // flexibility if they want to try enabling fatal threshold without backpressure. 248 if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) { 249 LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.", 250 CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 251 watermarkHigh); 252 fatalThreshold = 0; 253 } 254 } 255 256 writeBufferFatalThreshold = fatalThreshold; 257 258 if ( 259 oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low() 260 || oldWaterMark.high() != writeBufferWaterMark.high() 261 || oldFatalThreshold != writeBufferFatalThreshold) 262 ) { 263 LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}", 264 disabled ? "disabled" : writeBufferWaterMark.low(), 265 disabled ? "disabled" : writeBufferWaterMark.high(), 266 writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold); 267 } 268 269 // update any existing channels 270 for (Channel channel : allChannels) { 271 channel.config().setWriteBufferWaterMark(writeBufferWaterMark); 272 // if disabling watermark, set auto read to true in case channel had been exceeding 273 // previous watermark 274 if (disabled) { 275 channel.config().setAutoRead(true); 276 } 277 } 278 } 279 280 public boolean isWritabilityBackpressureEnabled() { 281 return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT; 282 } 283 284 private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { 285 final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); 286 if (value != null) { 287 if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 288 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 289 return PooledByteBufAllocator.DEFAULT; 290 } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 291 LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); 292 return UnpooledByteBufAllocator.DEFAULT; 293 } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 294 LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); 295 return HeapByteBufAllocator.DEFAULT; 296 } else { 297 // If the value is none of the recognized labels, treat it as a class name. This allows the 298 // user to supply a custom implementation, perhaps for debugging. 299 try { 300 // ReflectionUtils throws UnsupportedOperationException if there are any problems. 301 ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value); 302 LOG.info("Using {} for buffer allocation", value); 303 return alloc; 304 } catch (ClassCastException | UnsupportedOperationException e) { 305 throw new IOException(e); 306 } 307 } 308 } else { 309 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 310 return PooledByteBufAllocator.DEFAULT; 311 } 312 } 313 314 // will be overridden in tests 315 @InterfaceAudience.Private 316 protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { 317 return new NettyServerRpcConnection(NettyRpcServer.this, channel); 318 } 319 320 @Override 321 public synchronized void start() { 322 if (started) { 323 return; 324 } 325 authTokenSecretMgr = createSecretManager(); 326 if (authTokenSecretMgr != null) { 327 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 328 // LeaderElector start. See HBASE-25875 329 synchronized (authTokenSecretMgr) { 330 setSecretManager(authTokenSecretMgr); 331 authTokenSecretMgr.start(); 332 } 333 } 334 this.authManager = new ServiceAuthorizationManager(); 335 HBasePolicyProvider.init(conf, authManager); 336 scheduler.start(); 337 started = true; 338 } 339 340 @Override 341 public synchronized void stop() { 342 if (!running) { 343 return; 344 } 345 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 346 FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); 347 if (ks != null) { 348 ks.stop(); 349 } 350 FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); 351 if (ts != null) { 352 ts.stop(); 353 } 354 if (authTokenSecretMgr != null) { 355 authTokenSecretMgr.stop(); 356 authTokenSecretMgr = null; 357 } 358 allChannels.close().awaitUninterruptibly(); 359 serverChannel.close(); 360 scheduler.stop(); 361 closed.countDown(); 362 running = false; 363 } 364 365 @Override 366 public synchronized void join() throws InterruptedException { 367 closed.await(); 368 } 369 370 @Override 371 public synchronized InetSocketAddress getListenerAddress() { 372 return ((InetSocketAddress) serverChannel.localAddress()); 373 } 374 375 @Override 376 public void setSocketSendBufSize(int size) { 377 } 378 379 @Override 380 public int getNumOpenConnections() { 381 return allChannels.size(); 382 } 383 384 private void initSSL(ChannelPipeline p, NettyServerRpcConnection conn, boolean supportPlaintext) 385 throws X509Exception, IOException { 386 SslContext nettySslContext = getSslContext(); 387 388 /* 389 * our HostnameVerifier gets the host name from SSLEngine, so we have to construct the engine 390 * properly by passing the remote address 391 */ 392 393 if (supportPlaintext) { 394 SocketAddress remoteAddress = p.channel().remoteAddress(); 395 OptionalSslHandler optionalSslHandler; 396 397 if (remoteAddress instanceof InetSocketAddress) { 398 InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress; 399 optionalSslHandler = new OptionalSslHandlerWithHostPort(nettySslContext, 400 remoteInetAddress.getHostString(), remoteInetAddress.getPort()); 401 } else { 402 optionalSslHandler = new OptionalSslHandler(nettySslContext); 403 } 404 405 p.addLast("ssl", optionalSslHandler); 406 LOG.debug("Dual mode SSL handler added for channel: {}", p.channel()); 407 } else { 408 SocketAddress remoteAddress = p.channel().remoteAddress(); 409 SslHandler sslHandler; 410 411 if (remoteAddress instanceof InetSocketAddress) { 412 InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress; 413 sslHandler = nettySslContext.newHandler(p.channel().alloc(), 414 remoteInetAddress.getHostString(), remoteInetAddress.getPort()); 415 } else { 416 sslHandler = nettySslContext.newHandler(p.channel().alloc()); 417 } 418 419 sslHandler.setWrapDataSize( 420 conf.getInt(HBASE_SERVER_NETTY_TLS_WRAP_SIZE, DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE)); 421 422 sslHandler.handshakeFuture() 423 .addListener(future -> sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress)); 424 425 p.addLast("ssl", sslHandler); 426 LOG.debug("SSL handler added for channel: {}", p.channel()); 427 } 428 } 429 430 static void sslHandshakeCompleteHandler(NettyServerRpcConnection conn, SslHandler sslHandler, 431 SocketAddress remoteAddress) { 432 try { 433 Certificate[] certificates = sslHandler.engine().getSession().getPeerCertificates(); 434 if (certificates != null && certificates.length > 0) { 435 X509Certificate[] x509Certificates = new X509Certificate[certificates.length]; 436 for (int i = 0; i < x509Certificates.length; i++) { 437 x509Certificates[i] = (X509Certificate) certificates[i]; 438 } 439 conn.clientCertificateChain = x509Certificates; 440 } else if (sslHandler.engine().getNeedClientAuth()) { 441 LOG.debug( 442 "Could not get peer certificate on TLS connection from {}, although one is required", 443 remoteAddress); 444 } 445 } catch (SSLPeerUnverifiedException e) { 446 if (sslHandler.engine().getNeedClientAuth()) { 447 LOG.debug( 448 "Could not get peer certificate on TLS connection from {}, although one is required", 449 remoteAddress, e); 450 } 451 } catch (Exception e) { 452 LOG.debug("Unexpected error getting peer certificate for TLS connection from {}", 453 remoteAddress, e); 454 } 455 } 456 457 SslContext getSslContext() throws X509Exception, IOException { 458 SslContext result = sslContextForServer.get(); 459 if (result == null) { 460 result = X509Util.createSslContextForServer(conf); 461 if (!sslContextForServer.compareAndSet(null, result)) { 462 // lost the race, another thread already set the value 463 result = sslContextForServer.get(); 464 } else if ( 465 keyStoreWatcher.get() == null && trustStoreWatcher.get() == null 466 && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) 467 ) { 468 X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, 469 () -> sslContextForServer.set(null)); 470 } 471 } 472 return result; 473 } 474 475 public int getWriteBufferFatalThreshold() { 476 return writeBufferFatalThreshold; 477 } 478 479 public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() { 480 long total = 0; 481 long max = 0; 482 for (Channel channel : allChannels) { 483 long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel); 484 total += outbound; 485 max = Math.max(max, outbound); 486 } 487 return Pair.newPair(total, max); 488 } 489}