001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; 021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 025 026import java.io.IOException; 027import java.net.InetSocketAddress; 028import java.net.UnknownHostException; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; 034import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 035import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; 036import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; 037import org.apache.hadoop.hbase.security.SaslChallengeDecoder; 038import org.apache.hadoop.hbase.util.NettyFutureUtils; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 047import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 048import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 050import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 051import org.apache.hbase.thirdparty.io.netty.channel.Channel; 052import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 053import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 054import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 055import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 056import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 057import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 058import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 059import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 060import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 061import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 062import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 063import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 064import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 065 066import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 067 068/** 069 * RPC connection implementation based on netty. 070 * <p/> 071 * Most operations are executed in handlers. Netty handler is always executed in the same 072 * thread(EventLoop) so no lock is needed. 073 * <p/> 074 * <strong>Implementation assumptions:</strong> All the private methods should be called in the 075 * {@link #eventLoop} thread, otherwise there will be races. 076 * @since 2.0.0 077 */ 078@InterfaceAudience.Private 079class NettyRpcConnection extends RpcConnection { 080 081 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); 082 083 private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors 084 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d") 085 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 086 087 private final NettyRpcClient rpcClient; 088 089 // the event loop used to set up the connection, we will also execute other operations for this 090 // connection in this event loop, to avoid locking everywhere. 091 private final EventLoop eventLoop; 092 093 private ByteBuf connectionHeaderPreamble; 094 095 private ByteBuf connectionHeaderWithLength; 096 097 // make it volatile so in the isActive method below we do not need to switch to the event loop 098 // thread to access this field. 099 private volatile Channel channel; 100 101 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { 102 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 103 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, 104 rpcClient.metrics); 105 this.rpcClient = rpcClient; 106 this.eventLoop = rpcClient.group.next(); 107 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); 108 this.connectionHeaderPreamble = 109 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); 110 ConnectionHeader header = getConnectionHeader(); 111 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); 112 this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); 113 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); 114 } 115 116 @Override 117 protected void callTimeout(Call call) { 118 execute(eventLoop, () -> { 119 if (channel != null) { 120 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); 121 } 122 }); 123 } 124 125 @Override 126 public boolean isActive() { 127 return channel != null; 128 } 129 130 private void shutdown0() { 131 assert eventLoop.inEventLoop(); 132 if (channel != null) { 133 NettyFutureUtils.consume(channel.close()); 134 channel = null; 135 } 136 } 137 138 @Override 139 public void shutdown() { 140 execute(eventLoop, this::shutdown0); 141 } 142 143 @Override 144 public void cleanupConnection() { 145 execute(eventLoop, () -> { 146 if (connectionHeaderPreamble != null) { 147 ReferenceCountUtil.safeRelease(connectionHeaderPreamble); 148 connectionHeaderPreamble = null; 149 } 150 if (connectionHeaderWithLength != null) { 151 ReferenceCountUtil.safeRelease(connectionHeaderWithLength); 152 connectionHeaderWithLength = null; 153 } 154 }); 155 } 156 157 private void established(Channel ch) throws IOException { 158 assert eventLoop.inEventLoop(); 159 ch.pipeline() 160 .addBefore(BufferCallBeforeInitHandler.NAME, null, 161 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)) 162 .addBefore(BufferCallBeforeInitHandler.NAME, null, 163 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) 164 .addBefore(BufferCallBeforeInitHandler.NAME, null, 165 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)) 166 .fireUserEventTriggered(BufferCallEvent.success()); 167 } 168 169 private boolean reloginInProgress; 170 171 @SuppressWarnings("FutureReturnValueIgnored") 172 private void scheduleRelogin(Throwable error) { 173 assert eventLoop.inEventLoop(); 174 if (error instanceof FallbackDisallowedException) { 175 return; 176 } 177 if (!provider.canRetry()) { 178 LOG.trace("SASL Provider does not support retries"); 179 return; 180 } 181 if (reloginInProgress) { 182 return; 183 } 184 reloginInProgress = true; 185 RELOGIN_EXECUTOR.schedule(() -> { 186 try { 187 provider.relogin(); 188 } catch (IOException e) { 189 LOG.warn("Relogin failed", e); 190 } 191 eventLoop.execute(() -> { 192 reloginInProgress = false; 193 }); 194 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); 195 } 196 197 private void failInit(Channel ch, IOException e) { 198 assert eventLoop.inEventLoop(); 199 // fail all pending calls 200 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); 201 shutdown0(); 202 } 203 204 private void saslNegotiate(final Channel ch) { 205 assert eventLoop.inEventLoop(); 206 UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket()); 207 if (ticket == null) { 208 failInit(ch, new FatalConnectionException("ticket/user is null")); 209 return; 210 } 211 Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); 212 final NettyHBaseSaslRpcClientHandler saslHandler; 213 try { 214 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, 215 ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo, 216 rpcClient.fallbackAllowed, this.rpcClient.conf); 217 } catch (IOException e) { 218 failInit(ch, e); 219 return; 220 } 221 ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder()) 222 .addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler); 223 NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() { 224 225 @Override 226 public void operationComplete(Future<Boolean> future) throws Exception { 227 if (future.isSuccess()) { 228 ChannelPipeline p = ch.pipeline(); 229 p.remove(SaslChallengeDecoder.class); 230 p.remove(NettyHBaseSaslRpcClientHandler.class); 231 232 // check if negotiate with server for connection header is necessary 233 if (saslHandler.isNeedProcessConnectionHeader()) { 234 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); 235 // create the handler to handle the connection header 236 NettyHBaseRpcConnectionHeaderHandler chHandler = 237 new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf, 238 connectionHeaderWithLength); 239 240 // add ReadTimeoutHandler to deal with server doesn't response connection header 241 // because of the different configuration in client side and server side 242 final String readTimeoutHandlerName = "ReadTimeout"; 243 p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName, 244 new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS)) 245 .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler); 246 NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() { 247 @Override 248 public void operationComplete(Future<Boolean> future) throws Exception { 249 if (future.isSuccess()) { 250 ChannelPipeline p = ch.pipeline(); 251 p.remove(readTimeoutHandlerName); 252 p.remove(NettyHBaseRpcConnectionHeaderHandler.class); 253 // don't send connection header, NettyHBaseRpcConnectionHeaderHandler 254 // sent it already 255 established(ch); 256 } else { 257 final Throwable error = future.cause(); 258 scheduleRelogin(error); 259 failInit(ch, toIOE(error)); 260 } 261 } 262 }); 263 } else { 264 // send the connection header to server 265 NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); 266 established(ch); 267 } 268 } else { 269 final Throwable error = future.cause(); 270 scheduleRelogin(error); 271 failInit(ch, toIOE(error)); 272 } 273 } 274 }); 275 } 276 277 private void connect() throws UnknownHostException { 278 assert eventLoop.inEventLoop(); 279 LOG.trace("Connecting to {}", remoteId.getAddress()); 280 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 281 this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) 282 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) 283 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) 284 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) 285 .handler(new ChannelInitializer<Channel>() { 286 287 @Override 288 protected void initChannel(Channel ch) throws Exception { 289 ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME, 290 new BufferCallBeforeInitHandler()); 291 } 292 }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect() 293 .addListener(new ChannelFutureListener() { 294 295 @Override 296 public void operationComplete(ChannelFuture future) throws Exception { 297 Channel ch = future.channel(); 298 if (!future.isSuccess()) { 299 failInit(ch, toIOE(future.cause())); 300 rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause()); 301 return; 302 } 303 NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate()); 304 if (useSasl) { 305 saslNegotiate(ch); 306 } else { 307 // send the connection header to server 308 NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); 309 established(ch); 310 } 311 } 312 }).channel(); 313 } 314 315 private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { 316 assert eventLoop.inEventLoop(); 317 if (reloginInProgress) { 318 throw new IOException("Can not send request because relogin is in progress."); 319 } 320 hrc.notifyOnCancel(new RpcCallback<Object>() { 321 322 @Override 323 public void run(Object parameter) { 324 setCancelled(call); 325 if (channel != null) { 326 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); 327 } 328 } 329 }, new CancellationCallback() { 330 331 @Override 332 public void run(boolean cancelled) throws IOException { 333 if (cancelled) { 334 setCancelled(call); 335 } else { 336 if (channel == null) { 337 connect(); 338 } 339 scheduleTimeoutTask(call); 340 NettyFutureUtils.addListener(channel.writeAndFlush(call), new ChannelFutureListener() { 341 @Override 342 public void operationComplete(ChannelFuture future) throws Exception { 343 // Fail the call if we failed to write it out. This usually because the channel is 344 // closed. This is needed because we may shutdown the channel inside event loop and 345 // there may still be some pending calls in the event loop queue after us. 346 if (!future.isSuccess()) { 347 call.setException(toIOE(future.cause())); 348 } 349 } 350 }); 351 } 352 } 353 }); 354 } 355 356 @Override 357 public void sendRequest(final Call call, HBaseRpcController hrc) { 358 execute(eventLoop, () -> { 359 try { 360 sendRequest0(call, hrc); 361 } catch (Exception e) { 362 call.setException(toIOE(e)); 363 } 364 }); 365 } 366}