001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; 021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 025 026import java.io.IOException; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; 032import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 033import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; 034import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; 035import org.apache.hadoop.hbase.security.SaslChallengeDecoder; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.hadoop.security.UserGroupInformation; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 043import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 044import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 046import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 047import org.apache.hbase.thirdparty.io.netty.channel.Channel; 048import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 049import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 050import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 051import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 052import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 053import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 054import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 055import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 056import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 057import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 058import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 059import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 060import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 063 064/** 065 * RPC connection implementation based on netty. 066 * <p/> 067 * Most operations are executed in handlers. Netty handler is always executed in the same 068 * thread(EventLoop) so no lock is needed. 069 * <p/> 070 * <strong>Implementation assumptions:</strong> All the private methods should be called in the 071 * {@link #eventLoop} thread, otherwise there will be races. 072 * @since 2.0.0 073 */ 074@InterfaceAudience.Private 075class NettyRpcConnection extends RpcConnection { 076 077 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); 078 079 private static final ScheduledExecutorService RELOGIN_EXECUTOR = 080 Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); 081 082 private final NettyRpcClient rpcClient; 083 084 // the event loop used to set up the connection, we will also execute other operations for this 085 // connection in this event loop, to avoid locking everywhere. 086 private final EventLoop eventLoop; 087 088 private ByteBuf connectionHeaderPreamble; 089 090 private ByteBuf connectionHeaderWithLength; 091 092 // make it volatile so in the isActive method below we do not need to switch to the event loop 093 // thread to access this field. 094 private volatile Channel channel; 095 096 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { 097 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 098 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); 099 this.rpcClient = rpcClient; 100 this.eventLoop = rpcClient.group.next(); 101 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); 102 this.connectionHeaderPreamble = 103 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); 104 ConnectionHeader header = getConnectionHeader(); 105 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); 106 this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); 107 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); 108 } 109 110 @Override 111 protected void callTimeout(Call call) { 112 execute(eventLoop, () -> { 113 if (channel != null) { 114 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); 115 } 116 }); 117 } 118 119 @Override 120 public boolean isActive() { 121 return channel != null; 122 } 123 124 private void shutdown0() { 125 assert eventLoop.inEventLoop(); 126 if (channel != null) { 127 channel.close(); 128 channel = null; 129 } 130 } 131 132 @Override 133 public void shutdown() { 134 execute(eventLoop, this::shutdown0); 135 } 136 137 @Override 138 public void cleanupConnection() { 139 execute(eventLoop, () -> { 140 if (connectionHeaderPreamble != null) { 141 ReferenceCountUtil.safeRelease(connectionHeaderPreamble); 142 connectionHeaderPreamble = null; 143 } 144 if (connectionHeaderWithLength != null) { 145 ReferenceCountUtil.safeRelease(connectionHeaderWithLength); 146 connectionHeaderWithLength = null; 147 } 148 }); 149 } 150 151 private void established(Channel ch) throws IOException { 152 assert eventLoop.inEventLoop(); 153 ChannelPipeline p = ch.pipeline(); 154 String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); 155 p.addBefore(addBeforeHandler, null, 156 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); 157 p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); 158 p.addBefore(addBeforeHandler, null, 159 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); 160 p.fireUserEventTriggered(BufferCallEvent.success()); 161 } 162 163 private boolean reloginInProgress; 164 165 private void scheduleRelogin(Throwable error) { 166 assert eventLoop.inEventLoop(); 167 if (error instanceof FallbackDisallowedException) { 168 return; 169 } 170 if (!provider.canRetry()) { 171 LOG.trace("SASL Provider does not support retries"); 172 return; 173 } 174 if (reloginInProgress) { 175 return; 176 } 177 reloginInProgress = true; 178 RELOGIN_EXECUTOR.schedule(() -> { 179 try { 180 provider.relogin(); 181 } catch (IOException e) { 182 LOG.warn("Relogin failed", e); 183 } 184 eventLoop.execute(() -> { 185 reloginInProgress = false; 186 }); 187 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); 188 } 189 190 private void failInit(Channel ch, IOException e) { 191 assert eventLoop.inEventLoop(); 192 // fail all pending calls 193 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); 194 shutdown0(); 195 } 196 197 private void saslNegotiate(final Channel ch) { 198 assert eventLoop.inEventLoop(); 199 UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket()); 200 if (ticket == null) { 201 failInit(ch, new FatalConnectionException("ticket/user is null")); 202 return; 203 } 204 Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); 205 final NettyHBaseSaslRpcClientHandler saslHandler; 206 try { 207 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, 208 serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); 209 } catch (IOException e) { 210 failInit(ch, e); 211 return; 212 } 213 ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); 214 saslPromise.addListener(new FutureListener<Boolean>() { 215 216 @Override 217 public void operationComplete(Future<Boolean> future) throws Exception { 218 if (future.isSuccess()) { 219 ChannelPipeline p = ch.pipeline(); 220 p.remove(SaslChallengeDecoder.class); 221 p.remove(NettyHBaseSaslRpcClientHandler.class); 222 223 // check if negotiate with server for connection header is necessary 224 if (saslHandler.isNeedProcessConnectionHeader()) { 225 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); 226 // create the handler to handle the connection header 227 ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler( 228 connectionHeaderPromise, conf, connectionHeaderWithLength); 229 230 // add ReadTimeoutHandler to deal with server doesn't response connection header 231 // because of the different configuration in client side and server side 232 p.addFirst( 233 new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); 234 p.addLast(chHandler); 235 connectionHeaderPromise.addListener(new FutureListener<Boolean>() { 236 @Override 237 public void operationComplete(Future<Boolean> future) throws Exception { 238 if (future.isSuccess()) { 239 ChannelPipeline p = ch.pipeline(); 240 p.remove(ReadTimeoutHandler.class); 241 p.remove(NettyHBaseRpcConnectionHeaderHandler.class); 242 // don't send connection header, NettyHbaseRpcConnectionHeaderHandler 243 // sent it already 244 established(ch); 245 } else { 246 final Throwable error = future.cause(); 247 scheduleRelogin(error); 248 failInit(ch, toIOE(error)); 249 } 250 } 251 }); 252 } else { 253 // send the connection header to server 254 ch.write(connectionHeaderWithLength.retainedDuplicate()); 255 established(ch); 256 } 257 } else { 258 final Throwable error = future.cause(); 259 scheduleRelogin(error); 260 failInit(ch, toIOE(error)); 261 } 262 } 263 }); 264 } 265 266 private void connect() { 267 assert eventLoop.inEventLoop(); 268 LOG.trace("Connecting to {}", remoteId.address); 269 270 this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) 271 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) 272 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) 273 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) 274 .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) 275 .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { 276 277 @Override 278 public void operationComplete(ChannelFuture future) throws Exception { 279 Channel ch = future.channel(); 280 if (!future.isSuccess()) { 281 failInit(ch, toIOE(future.cause())); 282 rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); 283 return; 284 } 285 ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); 286 if (useSasl) { 287 saslNegotiate(ch); 288 } else { 289 // send the connection header to server 290 ch.write(connectionHeaderWithLength.retainedDuplicate()); 291 established(ch); 292 } 293 } 294 }).channel(); 295 } 296 297 private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { 298 assert eventLoop.inEventLoop(); 299 if (reloginInProgress) { 300 throw new IOException("Can not send request because relogin is in progress."); 301 } 302 hrc.notifyOnCancel(new RpcCallback<Object>() { 303 304 @Override 305 public void run(Object parameter) { 306 setCancelled(call); 307 if (channel != null) { 308 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); 309 } 310 } 311 }, new CancellationCallback() { 312 313 @Override 314 public void run(boolean cancelled) throws IOException { 315 if (cancelled) { 316 setCancelled(call); 317 } else { 318 if (channel == null) { 319 connect(); 320 } 321 scheduleTimeoutTask(call); 322 channel.writeAndFlush(call).addListener(new ChannelFutureListener() { 323 324 @Override 325 public void operationComplete(ChannelFuture future) throws Exception { 326 // Fail the call if we failed to write it out. This usually because the channel is 327 // closed. This is needed because we may shutdown the channel inside event loop and 328 // there may still be some pending calls in the event loop queue after us. 329 if (!future.isSuccess()) { 330 call.setException(toIOE(future.cause())); 331 } 332 } 333 }); 334 } 335 } 336 }); 337 } 338 339 @Override 340 public void sendRequest(final Call call, HBaseRpcController hrc) { 341 execute(eventLoop, () -> { 342 try { 343 sendRequest0(call, hrc); 344 } catch (Exception e) { 345 call.setException(toIOE(e)); 346 } 347 }); 348 } 349}