001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; 021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 025 026import java.io.IOException; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; 032import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 033import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; 034import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; 035import org.apache.hadoop.hbase.security.SaslChallengeDecoder; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.hadoop.security.UserGroupInformation; 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 044import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 046import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 047import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 048import org.apache.hbase.thirdparty.io.netty.channel.Channel; 049import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 050import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 051import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 052import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 053import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 054import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 055import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 056import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 057import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 058import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 059import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 060import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 061import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 062 063import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 064 065/** 066 * RPC connection implementation based on netty. 067 * <p/> 068 * Most operations are executed in handlers. Netty handler is always executed in the same 069 * thread(EventLoop) so no lock is needed. 070 * <p/> 071 * <strong>Implementation assumptions:</strong> All the private methods should be called in the 072 * {@link #eventLoop} thread, otherwise there will be races. 073 * @since 2.0.0 074 */ 075@InterfaceAudience.Private 076class NettyRpcConnection extends RpcConnection { 077 078 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); 079 080 private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors 081 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d") 082 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 083 084 private final NettyRpcClient rpcClient; 085 086 // the event loop used to set up the connection, we will also execute other operations for this 087 // connection in this event loop, to avoid locking everywhere. 088 private final EventLoop eventLoop; 089 090 private ByteBuf connectionHeaderPreamble; 091 092 private ByteBuf connectionHeaderWithLength; 093 094 // make it volatile so in the isActive method below we do not need to switch to the event loop 095 // thread to access this field. 096 private volatile Channel channel; 097 098 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { 099 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 100 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); 101 this.rpcClient = rpcClient; 102 this.eventLoop = rpcClient.group.next(); 103 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); 104 this.connectionHeaderPreamble = 105 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); 106 ConnectionHeader header = getConnectionHeader(); 107 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); 108 this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); 109 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); 110 } 111 112 @Override 113 protected void callTimeout(Call call) { 114 execute(eventLoop, () -> { 115 if (channel != null) { 116 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); 117 } 118 }); 119 } 120 121 @Override 122 public boolean isActive() { 123 return channel != null; 124 } 125 126 private void shutdown0() { 127 assert eventLoop.inEventLoop(); 128 if (channel != null) { 129 channel.close(); 130 channel = null; 131 } 132 } 133 134 @Override 135 public void shutdown() { 136 execute(eventLoop, this::shutdown0); 137 } 138 139 @Override 140 public void cleanupConnection() { 141 execute(eventLoop, () -> { 142 if (connectionHeaderPreamble != null) { 143 ReferenceCountUtil.safeRelease(connectionHeaderPreamble); 144 connectionHeaderPreamble = null; 145 } 146 if (connectionHeaderWithLength != null) { 147 ReferenceCountUtil.safeRelease(connectionHeaderWithLength); 148 connectionHeaderWithLength = null; 149 } 150 }); 151 } 152 153 private void established(Channel ch) throws IOException { 154 assert eventLoop.inEventLoop(); 155 ChannelPipeline p = ch.pipeline(); 156 String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); 157 p.addBefore(addBeforeHandler, null, 158 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); 159 p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); 160 p.addBefore(addBeforeHandler, null, 161 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); 162 p.fireUserEventTriggered(BufferCallEvent.success()); 163 } 164 165 private boolean reloginInProgress; 166 167 private void scheduleRelogin(Throwable error) { 168 assert eventLoop.inEventLoop(); 169 if (error instanceof FallbackDisallowedException) { 170 return; 171 } 172 if (!provider.canRetry()) { 173 LOG.trace("SASL Provider does not support retries"); 174 return; 175 } 176 if (reloginInProgress) { 177 return; 178 } 179 reloginInProgress = true; 180 RELOGIN_EXECUTOR.schedule(() -> { 181 try { 182 provider.relogin(); 183 } catch (IOException e) { 184 LOG.warn("Relogin failed", e); 185 } 186 eventLoop.execute(() -> { 187 reloginInProgress = false; 188 }); 189 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); 190 } 191 192 private void failInit(Channel ch, IOException e) { 193 assert eventLoop.inEventLoop(); 194 // fail all pending calls 195 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); 196 shutdown0(); 197 } 198 199 private void saslNegotiate(final Channel ch) { 200 assert eventLoop.inEventLoop(); 201 UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket()); 202 if (ticket == null) { 203 failInit(ch, new FatalConnectionException("ticket/user is null")); 204 return; 205 } 206 Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); 207 final NettyHBaseSaslRpcClientHandler saslHandler; 208 try { 209 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, 210 serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); 211 } catch (IOException e) { 212 failInit(ch, e); 213 return; 214 } 215 ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); 216 saslPromise.addListener(new FutureListener<Boolean>() { 217 218 @Override 219 public void operationComplete(Future<Boolean> future) throws Exception { 220 if (future.isSuccess()) { 221 ChannelPipeline p = ch.pipeline(); 222 p.remove(SaslChallengeDecoder.class); 223 p.remove(NettyHBaseSaslRpcClientHandler.class); 224 225 // check if negotiate with server for connection header is necessary 226 if (saslHandler.isNeedProcessConnectionHeader()) { 227 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); 228 // create the handler to handle the connection header 229 ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler( 230 connectionHeaderPromise, conf, connectionHeaderWithLength); 231 232 // add ReadTimeoutHandler to deal with server doesn't response connection header 233 // because of the different configuration in client side and server side 234 p.addFirst( 235 new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); 236 p.addLast(chHandler); 237 connectionHeaderPromise.addListener(new FutureListener<Boolean>() { 238 @Override 239 public void operationComplete(Future<Boolean> future) throws Exception { 240 if (future.isSuccess()) { 241 ChannelPipeline p = ch.pipeline(); 242 p.remove(ReadTimeoutHandler.class); 243 p.remove(NettyHBaseRpcConnectionHeaderHandler.class); 244 // don't send connection header, NettyHbaseRpcConnectionHeaderHandler 245 // sent it already 246 established(ch); 247 } else { 248 final Throwable error = future.cause(); 249 scheduleRelogin(error); 250 failInit(ch, toIOE(error)); 251 } 252 } 253 }); 254 } else { 255 // send the connection header to server 256 ch.write(connectionHeaderWithLength.retainedDuplicate()); 257 established(ch); 258 } 259 } else { 260 final Throwable error = future.cause(); 261 scheduleRelogin(error); 262 failInit(ch, toIOE(error)); 263 } 264 } 265 }); 266 } 267 268 private void connect() { 269 assert eventLoop.inEventLoop(); 270 LOG.trace("Connecting to {}", remoteId.address); 271 272 this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) 273 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) 274 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) 275 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) 276 .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) 277 .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { 278 279 @Override 280 public void operationComplete(ChannelFuture future) throws Exception { 281 Channel ch = future.channel(); 282 if (!future.isSuccess()) { 283 failInit(ch, toIOE(future.cause())); 284 rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); 285 return; 286 } 287 ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); 288 if (useSasl) { 289 saslNegotiate(ch); 290 } else { 291 // send the connection header to server 292 ch.write(connectionHeaderWithLength.retainedDuplicate()); 293 established(ch); 294 } 295 } 296 }).channel(); 297 } 298 299 private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { 300 assert eventLoop.inEventLoop(); 301 if (reloginInProgress) { 302 throw new IOException("Can not send request because relogin is in progress."); 303 } 304 hrc.notifyOnCancel(new RpcCallback<Object>() { 305 306 @Override 307 public void run(Object parameter) { 308 setCancelled(call); 309 if (channel != null) { 310 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); 311 } 312 } 313 }, new CancellationCallback() { 314 315 @Override 316 public void run(boolean cancelled) throws IOException { 317 if (cancelled) { 318 setCancelled(call); 319 } else { 320 if (channel == null) { 321 connect(); 322 } 323 scheduleTimeoutTask(call); 324 channel.writeAndFlush(call).addListener(new ChannelFutureListener() { 325 326 @Override 327 public void operationComplete(ChannelFuture future) throws Exception { 328 // Fail the call if we failed to write it out. This usually because the channel is 329 // closed. This is needed because we may shutdown the channel inside event loop and 330 // there may still be some pending calls in the event loop queue after us. 331 if (!future.isSuccess()) { 332 call.setException(toIOE(future.cause())); 333 } 334 } 335 }); 336 } 337 } 338 }); 339 } 340 341 @Override 342 public void sendRequest(final Call call, HBaseRpcController hrc) { 343 execute(eventLoop, () -> { 344 try { 345 sendRequest0(call, hrc); 346 } catch (Exception e) { 347 call.setException(toIOE(e)); 348 } 349 }); 350 } 351}