001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE; 021import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED; 022import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT; 023import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_WRAP_SIZE; 024import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.net.InetSocketAddress; 029import java.net.SocketAddress; 030import java.security.cert.Certificate; 031import java.security.cert.X509Certificate; 032import java.util.List; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.atomic.AtomicReference; 035import javax.net.ssl.SSLPeerUnverifiedException; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseInterfaceAudience; 038import org.apache.hadoop.hbase.HBaseServerBase; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.exceptions.X509Exception; 041import org.apache.hadoop.hbase.io.FileChangeWatcher; 042import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 043import org.apache.hadoop.hbase.security.HBasePolicyProvider; 044import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 045import org.apache.hadoop.hbase.util.NettyUnsafeUtils; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 054import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 055import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 056import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; 057import org.apache.hbase.thirdparty.io.netty.channel.Channel; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 060import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 063import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; 064import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 065import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 066import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler; 067import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; 068import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; 069import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 070 071/** 072 * An RPC server with Netty4 implementation. 073 * @since 2.0.0 074 */ 075@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 076public class NettyRpcServer extends RpcServer { 077 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 078 079 /** 080 * Name of property to change the byte buf allocator for the netty channels. Default is no value, 081 * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", 082 * and "heap", or, the name of a class implementing ByteBufAllocator. 083 * <p> 084 * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is 085 * controlled by platform specific code and documented system properties. 086 * <p> 087 * "heap" will prefer heap arena allocations. 088 */ 089 public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; 090 static final String POOLED_ALLOCATOR_TYPE = "pooled"; 091 static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; 092 static final String HEAP_ALLOCATOR_TYPE = "heap"; 093 094 /** 095 * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was 096 * exceeded, channel will have setAutoRead to true again. The server will start reading incoming 097 * bytes (requests) from the client channel. 098 */ 099 public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY = 100 "hbase.server.netty.writable.watermark.low"; 101 private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0; 102 103 /** 104 * High watermark for pending outbound bytes of a single netty channel. If the number of pending 105 * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server 106 * will stop reading incoming requests from the client channel. 107 * <p> 108 * Note: any requests already in the call queue will still be processed. 109 */ 110 public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY = 111 "hbase.server.netty.writable.watermark.high"; 112 private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0; 113 114 /** 115 * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending 116 * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory 117 * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight 118 * requests. 119 * <p> 120 * Note: must be higher than the high watermark, otherwise it's ignored. 121 */ 122 public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY = 123 "hbase.server.netty.writable.watermark.fatal"; 124 private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0; 125 126 private final InetSocketAddress bindAddress; 127 128 private final CountDownLatch closed = new CountDownLatch(1); 129 private final Channel serverChannel; 130 final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 131 private final ByteBufAllocator channelAllocator; 132 private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>(); 133 private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>(); 134 private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>(); 135 136 private volatile int writeBufferFatalThreshold; 137 private volatile WriteBufferWaterMark writeBufferWaterMark; 138 139 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 140 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 141 boolean reservoirEnabled) throws IOException { 142 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 143 this.bindAddress = bindAddress; 144 this.channelAllocator = getChannelAllocator(conf); 145 // Get the event loop group configuration from the server class if available. 146 NettyEventLoopGroupConfig config = null; 147 if (server instanceof HBaseServerBase) { 148 config = ((HBaseServerBase<?>) server).getEventLoopGroupConfig(); 149 } 150 if (config == null) { 151 config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer"); 152 } 153 154 // call before creating bootstrap below so that the necessary configs can be set 155 configureNettyWatermarks(conf); 156 157 EventLoopGroup eventLoopGroup = config.group(); 158 Class<? extends ServerChannel> channelClass = config.serverChannelClass(); 159 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 160 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 161 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 162 .childOption(ChannelOption.SO_REUSEADDR, true) 163 .childHandler(new ChannelInitializer<Channel>() { 164 @Override 165 protected void initChannel(Channel ch) throws Exception { 166 ch.config().setWriteBufferWaterMark(writeBufferWaterMark); 167 ch.config().setAllocator(channelAllocator); 168 ChannelPipeline pipeline = ch.pipeline(); 169 170 NettyServerRpcConnection conn = createNettyServerRpcConnection(ch); 171 172 if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) { 173 initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true)); 174 } 175 pipeline 176 .addLast(NettyRpcServerPreambleHandler.DECODER_NAME, 177 NettyRpcServerPreambleHandler.createDecoder()) 178 .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn)) 179 // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may 180 // send RpcResponse to client. 181 .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics)) 182 // Add writability handler after the response encoder, so we can abort writes before 183 // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so 184 // that the handler configs can be live updated via update_config. 185 .addLast(NettyRpcServerChannelWritabilityHandler.NAME, 186 new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold, 187 () -> isWritabilityBackpressureEnabled())); 188 } 189 }); 190 try { 191 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 192 LOG.info("Bind to {}", serverChannel.localAddress()); 193 } catch (InterruptedException e) { 194 throw new InterruptedIOException(e.getMessage()); 195 } 196 initReconfigurable(conf); 197 this.scheduler.init(new RpcSchedulerContext(this)); 198 } 199 200 @Override 201 public void onConfigurationChange(Configuration newConf) { 202 super.onConfigurationChange(newConf); 203 configureNettyWatermarks(newConf); 204 } 205 206 private void configureNettyWatermarks(Configuration conf) { 207 int watermarkLow = 208 conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT); 209 int watermarkHigh = 210 conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT); 211 int fatalThreshold = 212 conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT); 213 214 WriteBufferWaterMark oldWaterMark = writeBufferWaterMark; 215 int oldFatalThreshold = writeBufferFatalThreshold; 216 217 boolean disabled = false; 218 if (watermarkHigh == 0 && watermarkLow == 0) { 219 // if both are 0, use the netty default, which we will treat as "disabled". 220 // when disabled, we won't manage autoRead in response to writability changes. 221 writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; 222 disabled = true; 223 } else { 224 // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to 225 // 1 to avoid confusing behavior. 226 if (watermarkLow == 0) { 227 LOG.warn( 228 "Detected a {} value of 0, which is impossible to achieve " 229 + "due to how netty evaluates these thresholds, setting to 1", 230 CHANNEL_WRITABLE_LOW_WATERMARK_KEY); 231 watermarkLow = 1; 232 } 233 234 // netty validates the watermarks and throws an exception if high < low, fail more gracefully 235 // by disabling the watermarks and warning. 236 if (watermarkHigh <= watermarkLow) { 237 LOG.warn( 238 "Detected {} value {}, lower than {} value {}. This will fail netty validation, " 239 + "so disabling", 240 CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 241 watermarkLow); 242 writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; 243 } else { 244 writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh); 245 } 246 247 // only apply this check when watermark is enabled. this way we give the operator some 248 // flexibility if they want to try enabling fatal threshold without backpressure. 249 if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) { 250 LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.", 251 CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 252 watermarkHigh); 253 fatalThreshold = 0; 254 } 255 } 256 257 writeBufferFatalThreshold = fatalThreshold; 258 259 if ( 260 oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low() 261 || oldWaterMark.high() != writeBufferWaterMark.high() 262 || oldFatalThreshold != writeBufferFatalThreshold) 263 ) { 264 LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}", 265 disabled ? "disabled" : writeBufferWaterMark.low(), 266 disabled ? "disabled" : writeBufferWaterMark.high(), 267 writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold); 268 } 269 270 // update any existing channels 271 for (Channel channel : allChannels) { 272 channel.config().setWriteBufferWaterMark(writeBufferWaterMark); 273 // if disabling watermark, set auto read to true in case channel had been exceeding 274 // previous watermark 275 if (disabled) { 276 channel.config().setAutoRead(true); 277 } 278 } 279 } 280 281 public boolean isWritabilityBackpressureEnabled() { 282 return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT; 283 } 284 285 private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { 286 final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); 287 if (value != null) { 288 if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 289 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 290 return PooledByteBufAllocator.DEFAULT; 291 } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 292 LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); 293 return UnpooledByteBufAllocator.DEFAULT; 294 } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 295 LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); 296 return HeapByteBufAllocator.DEFAULT; 297 } else { 298 // If the value is none of the recognized labels, treat it as a class name. This allows the 299 // user to supply a custom implementation, perhaps for debugging. 300 try { 301 // ReflectionUtils throws UnsupportedOperationException if there are any problems. 302 ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value); 303 LOG.info("Using {} for buffer allocation", value); 304 return alloc; 305 } catch (ClassCastException | UnsupportedOperationException e) { 306 throw new IOException(e); 307 } 308 } 309 } else { 310 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 311 return PooledByteBufAllocator.DEFAULT; 312 } 313 } 314 315 // will be overridden in tests 316 @InterfaceAudience.Private 317 protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { 318 return new NettyServerRpcConnection(NettyRpcServer.this, channel); 319 } 320 321 @Override 322 public synchronized void start() { 323 if (started) { 324 return; 325 } 326 authTokenSecretMgr = createSecretManager(); 327 if (authTokenSecretMgr != null) { 328 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 329 // LeaderElector start. See HBASE-25875 330 synchronized (authTokenSecretMgr) { 331 setSecretManager(authTokenSecretMgr); 332 authTokenSecretMgr.start(); 333 } 334 } 335 this.authManager = new ServiceAuthorizationManager(); 336 HBasePolicyProvider.init(conf, authManager); 337 scheduler.start(); 338 started = true; 339 } 340 341 @Override 342 public synchronized void stop() { 343 if (!running) { 344 return; 345 } 346 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 347 FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); 348 if (ks != null) { 349 ks.stop(); 350 } 351 FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); 352 if (ts != null) { 353 ts.stop(); 354 } 355 if (authTokenSecretMgr != null) { 356 authTokenSecretMgr.stop(); 357 authTokenSecretMgr = null; 358 } 359 allChannels.close().awaitUninterruptibly(); 360 serverChannel.close(); 361 scheduler.stop(); 362 closed.countDown(); 363 running = false; 364 } 365 366 @Override 367 public synchronized void join() throws InterruptedException { 368 closed.await(); 369 } 370 371 @Override 372 public synchronized InetSocketAddress getListenerAddress() { 373 return ((InetSocketAddress) serverChannel.localAddress()); 374 } 375 376 @Override 377 public void setSocketSendBufSize(int size) { 378 } 379 380 @Override 381 public int getNumOpenConnections() { 382 return allChannels.size(); 383 } 384 385 private void initSSL(ChannelPipeline p, NettyServerRpcConnection conn, boolean supportPlaintext) 386 throws X509Exception, IOException { 387 SslContext nettySslContext = getSslContext(); 388 389 if (supportPlaintext) { 390 p.addLast("ssl", new OptionalSslHandler(nettySslContext)); 391 LOG.debug("Dual mode SSL handler added for channel: {}", p.channel()); 392 } else { 393 SocketAddress remoteAddress = p.channel().remoteAddress(); 394 SslHandler sslHandler; 395 396 if (remoteAddress instanceof InetSocketAddress) { 397 InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress; 398 String host; 399 400 if (conf.getBoolean(TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED, true)) { 401 host = remoteInetAddress.getHostName(); 402 } else { 403 host = remoteInetAddress.getHostString(); 404 } 405 406 int port = remoteInetAddress.getPort(); 407 408 /* 409 * our HostnameVerifier gets the host name from SSLEngine, so we have to construct the 410 * engine properly by passing the remote address 411 */ 412 sslHandler = nettySslContext.newHandler(p.channel().alloc(), host, port); 413 } else { 414 sslHandler = nettySslContext.newHandler(p.channel().alloc()); 415 } 416 417 sslHandler.setWrapDataSize( 418 conf.getInt(HBASE_SERVER_NETTY_TLS_WRAP_SIZE, DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE)); 419 420 sslHandler.handshakeFuture() 421 .addListener(future -> sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress)); 422 423 p.addLast("ssl", sslHandler); 424 LOG.debug("SSL handler added for channel: {}", p.channel()); 425 } 426 } 427 428 static void sslHandshakeCompleteHandler(NettyServerRpcConnection conn, SslHandler sslHandler, 429 SocketAddress remoteAddress) { 430 try { 431 Certificate[] certificates = sslHandler.engine().getSession().getPeerCertificates(); 432 if (certificates != null && certificates.length > 0) { 433 X509Certificate[] x509Certificates = new X509Certificate[certificates.length]; 434 for (int i = 0; i < x509Certificates.length; i++) { 435 x509Certificates[i] = (X509Certificate) certificates[i]; 436 } 437 conn.clientCertificateChain = x509Certificates; 438 } else if (sslHandler.engine().getNeedClientAuth()) { 439 LOG.debug( 440 "Could not get peer certificate on TLS connection from {}, although one is required", 441 remoteAddress); 442 } 443 } catch (SSLPeerUnverifiedException e) { 444 if (sslHandler.engine().getNeedClientAuth()) { 445 LOG.debug( 446 "Could not get peer certificate on TLS connection from {}, although one is required", 447 remoteAddress, e); 448 } 449 } catch (Exception e) { 450 LOG.debug("Unexpected error getting peer certificate for TLS connection from {}", 451 remoteAddress, e); 452 } 453 } 454 455 SslContext getSslContext() throws X509Exception, IOException { 456 SslContext result = sslContextForServer.get(); 457 if (result == null) { 458 result = X509Util.createSslContextForServer(conf); 459 if (!sslContextForServer.compareAndSet(null, result)) { 460 // lost the race, another thread already set the value 461 result = sslContextForServer.get(); 462 } else if ( 463 keyStoreWatcher.get() == null && trustStoreWatcher.get() == null 464 && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) 465 ) { 466 X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, 467 () -> sslContextForServer.set(null)); 468 } 469 } 470 return result; 471 } 472 473 public int getWriteBufferFatalThreshold() { 474 return writeBufferFatalThreshold; 475 } 476 477 public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() { 478 long total = 0; 479 long max = 0; 480 for (Channel channel : allChannels) { 481 long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel); 482 total += outbound; 483 max = Math.max(max, outbound); 484 } 485 return Pair.newPair(total, max); 486 } 487}