001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; 021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 025 026import java.io.IOException; 027import java.net.InetSocketAddress; 028import java.net.UnknownHostException; 029import java.util.Set; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.TimeUnit; 034import javax.security.sasl.SaslException; 035import org.apache.hadoop.hbase.client.ConnectionUtils; 036import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 037import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 039import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; 040import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; 041import org.apache.hadoop.hbase.security.SaslChallengeDecoder; 042import org.apache.hadoop.hbase.util.NettyFutureUtils; 043import org.apache.hadoop.hbase.util.Threads; 044import org.apache.hadoop.security.UserGroupInformation; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 051import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 052import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 053import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 054import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 055import org.apache.hbase.thirdparty.io.netty.channel.Channel; 056import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 060import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 062import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 063import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; 064import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; 065import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 066import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 067import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 068import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 069import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 070import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 071 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 073 074/** 075 * RPC connection implementation based on netty. 076 * <p/> 077 * Most operations are executed in handlers. Netty handler is always executed in the same 078 * thread(EventLoop) so no lock is needed. 079 * <p/> 080 * <strong>Implementation assumptions:</strong> All the private methods should be called in the 081 * {@link #eventLoop} thread, otherwise there will be races. 082 * @since 2.0.0 083 */ 084@InterfaceAudience.Private 085class NettyRpcConnection extends RpcConnection { 086 087 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); 088 089 private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors 090 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d") 091 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 092 093 private final NettyRpcClient rpcClient; 094 095 // the event loop used to set up the connection, we will also execute other operations for this 096 // connection in this event loop, to avoid locking everywhere. 097 private final EventLoop eventLoop; 098 099 private ByteBuf connectionHeaderPreamble; 100 101 private ByteBuf connectionHeaderWithLength; 102 103 // make it volatile so in the isActive method below we do not need to switch to the event loop 104 // thread to access this field. 105 private volatile Channel channel; 106 107 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { 108 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 109 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, 110 rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes); 111 this.rpcClient = rpcClient; 112 this.eventLoop = rpcClient.group.next(); 113 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); 114 this.connectionHeaderPreamble = 115 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); 116 ConnectionHeader header = getConnectionHeader(); 117 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); 118 this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); 119 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); 120 } 121 122 @Override 123 protected void callTimeout(Call call) { 124 execute(eventLoop, () -> { 125 if (channel != null) { 126 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); 127 } 128 }); 129 } 130 131 @Override 132 public boolean isActive() { 133 return channel != null; 134 } 135 136 private void shutdown0() { 137 assert eventLoop.inEventLoop(); 138 if (channel != null) { 139 channel.close(); 140 channel = null; 141 } 142 } 143 144 @Override 145 public void shutdown() { 146 execute(eventLoop, this::shutdown0); 147 } 148 149 @Override 150 public void cleanupConnection() { 151 execute(eventLoop, () -> { 152 if (connectionHeaderPreamble != null) { 153 ReferenceCountUtil.safeRelease(connectionHeaderPreamble); 154 connectionHeaderPreamble = null; 155 } 156 if (connectionHeaderWithLength != null) { 157 ReferenceCountUtil.safeRelease(connectionHeaderWithLength); 158 connectionHeaderWithLength = null; 159 } 160 }); 161 } 162 163 private void established(Channel ch) { 164 assert eventLoop.inEventLoop(); 165 ch.pipeline() 166 .addBefore(BufferCallBeforeInitHandler.NAME, null, 167 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)) 168 .addBefore(BufferCallBeforeInitHandler.NAME, null, 169 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) 170 .addBefore(BufferCallBeforeInitHandler.NAME, null, 171 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)) 172 .fireUserEventTriggered(BufferCallEvent.success()); 173 } 174 175 private void saslEstablished(Channel ch, String serverPrincipal) { 176 saslNegotiationDone(serverPrincipal, true); 177 established(ch); 178 } 179 180 private boolean reloginInProgress; 181 182 private void scheduleRelogin(Throwable error) { 183 assert eventLoop.inEventLoop(); 184 if (error instanceof FallbackDisallowedException) { 185 return; 186 } 187 if (!provider.canRetry()) { 188 LOG.trace("SASL Provider does not support retries"); 189 return; 190 } 191 if (reloginInProgress) { 192 return; 193 } 194 reloginInProgress = true; 195 RELOGIN_EXECUTOR.schedule(() -> { 196 try { 197 provider.relogin(); 198 } catch (IOException e) { 199 LOG.warn("Relogin failed", e); 200 } 201 eventLoop.execute(() -> { 202 reloginInProgress = false; 203 }); 204 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); 205 } 206 207 private void failInit(Channel ch, IOException e) { 208 assert eventLoop.inEventLoop(); 209 // fail all pending calls 210 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); 211 shutdown0(); 212 rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), e); 213 } 214 215 private void saslFailInit(Channel ch, String serverPrincipal, IOException error) { 216 assert eventLoop.inEventLoop(); 217 saslNegotiationDone(serverPrincipal, false); 218 failInit(ch, error); 219 } 220 221 private void saslNegotiate(Channel ch, String serverPrincipal) { 222 assert eventLoop.inEventLoop(); 223 NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate()); 224 UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket()); 225 if (ticket == null) { 226 saslFailInit(ch, serverPrincipal, new FatalConnectionException("ticket/user is null")); 227 return; 228 } 229 Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); 230 final NettyHBaseSaslRpcClientHandler saslHandler; 231 try { 232 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, 233 ((InetSocketAddress) ch.remoteAddress()).getAddress(), serverPrincipal, 234 rpcClient.fallbackAllowed, this.rpcClient.conf); 235 } catch (IOException e) { 236 saslFailInit(ch, serverPrincipal, e); 237 return; 238 } 239 ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder()) 240 .addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME, 241 saslHandler); 242 NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() { 243 244 @Override 245 public void operationComplete(Future<Boolean> future) throws Exception { 246 if (future.isSuccess()) { 247 ChannelPipeline p = ch.pipeline(); 248 // check if negotiate with server for connection header is necessary 249 if (saslHandler.isNeedProcessConnectionHeader()) { 250 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); 251 // create the handler to handle the connection header 252 NettyHBaseRpcConnectionHeaderHandler chHandler = 253 new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf, 254 connectionHeaderWithLength); 255 256 // add ReadTimeoutHandler to deal with server doesn't response connection header 257 // because of the different configuration in client side and server side 258 final String readTimeoutHandlerName = "ReadTimeout"; 259 p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName, 260 new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS)) 261 .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler); 262 NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() { 263 @Override 264 public void operationComplete(Future<Boolean> future) throws Exception { 265 if (future.isSuccess()) { 266 ChannelPipeline p = ch.pipeline(); 267 p.remove(readTimeoutHandlerName); 268 p.remove(NettyHBaseRpcConnectionHeaderHandler.class); 269 // don't send connection header, NettyHBaseRpcConnectionHeaderHandler 270 // sent it already 271 saslEstablished(ch, serverPrincipal); 272 } else { 273 final Throwable error = future.cause(); 274 scheduleRelogin(error); 275 saslFailInit(ch, serverPrincipal, toIOE(error)); 276 } 277 } 278 }); 279 } else { 280 // send the connection header to server 281 ch.write(connectionHeaderWithLength.retainedDuplicate()); 282 saslEstablished(ch, serverPrincipal); 283 } 284 } else { 285 final Throwable error = future.cause(); 286 scheduleRelogin(error); 287 saslFailInit(ch, serverPrincipal, toIOE(error)); 288 } 289 } 290 }); 291 } 292 293 private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) { 294 assert eventLoop.inEventLoop(); 295 PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, 296 RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall); 297 } 298 299 private void onSecurityPreambleError(Channel ch, Set<String> serverPrincipals, 300 IOException error) { 301 assert eventLoop.inEventLoop(); 302 LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, error); 303 if (ConnectionUtils.isUnexpectedPreambleHeaderException(error)) { 304 // this means we are connecting to an old server which does not support the security 305 // preamble call, so we should fallback to randomly select a principal to use 306 // TODO: find a way to reconnect without failing all the pending calls, for now, when we 307 // reach here, shutdown should have already been scheduled 308 return; 309 } 310 if (IPCUtil.isSecurityNotEnabledException(error)) { 311 // server tells us security is not enabled, then we should check whether fallback to 312 // simple is allowed, if so we just go without security, otherwise we should fail the 313 // negotiation immediately 314 if (rpcClient.fallbackAllowed) { 315 // TODO: just change the preamble and skip the fallback to simple logic, for now, just 316 // select the first principal can finish the connection setup, but waste one client 317 // message 318 saslNegotiate(ch, serverPrincipals.iterator().next()); 319 } else { 320 failInit(ch, new FallbackDisallowedException()); 321 } 322 return; 323 } 324 // usually we should not reach here, but for robust, just randomly select a principal to 325 // connect 326 saslNegotiate(ch, randomSelect(serverPrincipals)); 327 } 328 329 private void onSecurityPreambleFinish(Channel ch, Set<String> serverPrincipals, 330 Call securityPreambleCall) { 331 assert eventLoop.inEventLoop(); 332 String serverPrincipal; 333 try { 334 serverPrincipal = chooseServerPrincipal(serverPrincipals, securityPreambleCall); 335 } catch (SaslException e) { 336 failInit(ch, e); 337 return; 338 } 339 saslNegotiate(ch, serverPrincipal); 340 } 341 342 private void saslNegotiate(Channel ch) throws IOException { 343 assert eventLoop.inEventLoop(); 344 Set<String> serverPrincipals = getServerPrincipals(); 345 if (serverPrincipals.size() == 1) { 346 saslNegotiate(ch, serverPrincipals.iterator().next()); 347 return; 348 } 349 // this means we use kerberos authentication and there are multiple server principal candidates, 350 // in this way we need to send a special preamble header to get server principal from server 351 Call securityPreambleCall = createSecurityPreambleCall(call -> { 352 assert eventLoop.inEventLoop(); 353 if (call.error != null) { 354 onSecurityPreambleError(ch, serverPrincipals, call.error); 355 } else { 356 onSecurityPreambleFinish(ch, serverPrincipals, call); 357 } 358 }); 359 PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, 360 RpcClient.SECURITY_PREAMBLE_HEADER, securityPreambleCall); 361 } 362 363 private void connect(Call connectionRegistryCall) throws UnknownHostException { 364 assert eventLoop.inEventLoop(); 365 LOG.trace("Connecting to {}", remoteId.getAddress()); 366 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 367 this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) 368 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) 369 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) 370 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) 371 .handler(new ChannelInitializer<Channel>() { 372 @Override 373 protected void initChannel(Channel ch) throws Exception { 374 if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) { 375 SslContext sslContext = rpcClient.getSslContext(); 376 SslHandler sslHandler = sslContext.newHandler(ch.alloc(), 377 remoteId.address.getHostName(), remoteId.address.getPort()); 378 sslHandler.setHandshakeTimeoutMillis( 379 conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT, 380 X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS)); 381 ch.pipeline().addFirst(sslHandler); 382 LOG.debug("SSL handler added with handshake timeout {} ms", 383 sslHandler.getHandshakeTimeoutMillis()); 384 } 385 ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME, 386 new BufferCallBeforeInitHandler()); 387 } 388 }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect() 389 .addListener(new ChannelFutureListener() { 390 391 private void succeed(Channel ch) throws IOException { 392 if (connectionRegistryCall != null) { 393 getConnectionRegistry(ch, connectionRegistryCall); 394 return; 395 } 396 if (!useSasl) { 397 // BufferCallBeforeInitHandler will call ctx.flush when receiving the 398 // BufferCallEvent.success() event, so here we just use write for the below two messages 399 NettyFutureUtils.safeWrite(ch, connectionHeaderPreamble.retainedDuplicate()); 400 NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); 401 established(ch); 402 } else { 403 saslNegotiate(ch); 404 } 405 } 406 407 private void fail(Channel ch, Throwable error) { 408 IOException ex = toIOE(error); 409 LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), 410 ex); 411 if (connectionRegistryCall != null) { 412 connectionRegistryCall.setException(ex); 413 } 414 failInit(ch, ex); 415 } 416 417 @Override 418 public void operationComplete(ChannelFuture future) throws Exception { 419 Channel ch = future.channel(); 420 if (!future.isSuccess()) { 421 fail(ch, future.cause()); 422 return; 423 } 424 SslHandler sslHandler = ch.pipeline().get(SslHandler.class); 425 if (sslHandler != null) { 426 NettyFutureUtils.addListener(sslHandler.handshakeFuture(), f -> { 427 if (f.isSuccess()) { 428 succeed(ch); 429 } else { 430 fail(ch, f.cause()); 431 } 432 }); 433 } else { 434 succeed(ch); 435 } 436 } 437 }).channel(); 438 } 439 440 private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { 441 assert eventLoop.inEventLoop(); 442 if (call.isConnectionRegistryCall()) { 443 // For get connection registry call, we will send a special preamble header to get the 444 // response, instead of sending a real rpc call. See HBASE-25051 445 connect(call); 446 return; 447 } 448 if (reloginInProgress) { 449 throw new IOException(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS); 450 } 451 hrc.notifyOnCancel(new RpcCallback<Object>() { 452 453 @Override 454 public void run(Object parameter) { 455 setCancelled(call); 456 if (channel != null) { 457 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); 458 } 459 } 460 }, new CancellationCallback() { 461 462 @Override 463 public void run(boolean cancelled) throws IOException { 464 if (cancelled) { 465 setCancelled(call); 466 } else { 467 if (channel == null) { 468 connect(null); 469 } 470 scheduleTimeoutTask(call); 471 channel.writeAndFlush(call).addListener(new ChannelFutureListener() { 472 473 @Override 474 public void operationComplete(ChannelFuture future) throws Exception { 475 // Fail the call if we failed to write it out. This usually because the channel is 476 // closed. This is needed because we may shutdown the channel inside event loop and 477 // there may still be some pending calls in the event loop queue after us. 478 if (!future.isSuccess()) { 479 call.setException(toIOE(future.cause())); 480 } 481 } 482 }); 483 } 484 } 485 }); 486 } 487 488 @Override 489 public void sendRequest(final Call call, HBaseRpcController hrc) { 490 execute(eventLoop, () -> { 491 try { 492 sendRequest0(call, hrc); 493 } catch (Exception e) { 494 call.setException(toIOE(e)); 495 } 496 }); 497 } 498}