001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; 021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 024 025import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 026import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; 027import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 028 029import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 030import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 031import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 032import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 033import org.apache.hbase.thirdparty.io.netty.channel.Channel; 034import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 035import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 036import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 037import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 038import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 039import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 040import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 041import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 042import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 043import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 044import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 045 046import java.io.IOException; 047import java.util.concurrent.Executors; 048import java.util.concurrent.ScheduledExecutorService; 049import java.util.concurrent.ThreadLocalRandom; 050import java.util.concurrent.TimeUnit; 051 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; 056import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 058import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; 059import org.apache.hadoop.hbase.security.SaslChallengeDecoder; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.security.UserGroupInformation; 062 063/** 064 * RPC connection implementation based on netty. 065 * <p> 066 * Most operations are executed in handlers. Netty handler is always executed in the same 067 * thread(EventLoop) so no lock is needed. 068 * @since 2.0.0 069 */ 070@InterfaceAudience.Private 071class NettyRpcConnection extends RpcConnection { 072 073 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); 074 075 private static final ScheduledExecutorService RELOGIN_EXECUTOR = 076 Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); 077 078 private final NettyRpcClient rpcClient; 079 080 private ByteBuf connectionHeaderPreamble; 081 082 private ByteBuf connectionHeaderWithLength; 083 084 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 085 justification = "connect is also under lock as notifyOnCancel will call our action directly") 086 private Channel channel; 087 088 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { 089 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 090 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); 091 this.rpcClient = rpcClient; 092 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); 093 this.connectionHeaderPreamble = 094 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); 095 ConnectionHeader header = getConnectionHeader(); 096 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); 097 this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); 098 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); 099 } 100 101 @Override 102 protected synchronized void callTimeout(Call call) { 103 if (channel != null) { 104 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); 105 } 106 } 107 108 @Override 109 public synchronized boolean isActive() { 110 return channel != null; 111 } 112 113 private void shutdown0() { 114 if (channel != null) { 115 channel.close(); 116 channel = null; 117 } 118 } 119 120 @Override 121 public synchronized void shutdown() { 122 shutdown0(); 123 } 124 125 @Override 126 public synchronized void cleanupConnection() { 127 if (connectionHeaderPreamble != null) { 128 ReferenceCountUtil.safeRelease(connectionHeaderPreamble); 129 } 130 if (connectionHeaderWithLength != null) { 131 ReferenceCountUtil.safeRelease(connectionHeaderWithLength); 132 } 133 } 134 135 private void established(Channel ch) throws IOException { 136 ChannelPipeline p = ch.pipeline(); 137 String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); 138 p.addBefore(addBeforeHandler, null, 139 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); 140 p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); 141 p.addBefore(addBeforeHandler, null, 142 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); 143 p.fireUserEventTriggered(BufferCallEvent.success()); 144 } 145 146 private boolean reloginInProgress; 147 148 private void scheduleRelogin(Throwable error) { 149 if (error instanceof FallbackDisallowedException) { 150 return; 151 } 152 synchronized (this) { 153 if (reloginInProgress) { 154 return; 155 } 156 reloginInProgress = true; 157 RELOGIN_EXECUTOR.schedule(new Runnable() { 158 159 @Override 160 public void run() { 161 try { 162 if (shouldAuthenticateOverKrb()) { 163 relogin(); 164 } 165 } catch (IOException e) { 166 LOG.warn("Relogin failed", e); 167 } 168 synchronized (this) { 169 reloginInProgress = false; 170 } 171 } 172 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); 173 } 174 } 175 176 private void failInit(Channel ch, IOException e) { 177 synchronized (this) { 178 // fail all pending calls 179 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); 180 shutdown0(); 181 return; 182 } 183 } 184 185 private void saslNegotiate(final Channel ch) { 186 UserGroupInformation ticket = getUGI(); 187 if (ticket == null) { 188 failInit(ch, new FatalConnectionException("ticket/user is null")); 189 return; 190 } 191 Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); 192 final NettyHBaseSaslRpcClientHandler saslHandler; 193 try { 194 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, 195 serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf); 196 } catch (IOException e) { 197 failInit(ch, e); 198 return; 199 } 200 ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); 201 saslPromise.addListener(new FutureListener<Boolean>() { 202 203 @Override 204 public void operationComplete(Future<Boolean> future) throws Exception { 205 if (future.isSuccess()) { 206 ChannelPipeline p = ch.pipeline(); 207 p.remove(SaslChallengeDecoder.class); 208 p.remove(NettyHBaseSaslRpcClientHandler.class); 209 210 // check if negotiate with server for connection header is necessary 211 if (saslHandler.isNeedProcessConnectionHeader()) { 212 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); 213 // create the handler to handle the connection header 214 ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler( 215 connectionHeaderPromise, conf, connectionHeaderWithLength); 216 217 // add ReadTimeoutHandler to deal with server doesn't response connection header 218 // because of the different configuration in client side and server side 219 p.addFirst( 220 new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); 221 p.addLast(chHandler); 222 connectionHeaderPromise.addListener(new FutureListener<Boolean>() { 223 @Override 224 public void operationComplete(Future<Boolean> future) throws Exception { 225 if (future.isSuccess()) { 226 ChannelPipeline p = ch.pipeline(); 227 p.remove(ReadTimeoutHandler.class); 228 p.remove(NettyHBaseRpcConnectionHeaderHandler.class); 229 // don't send connection header, NettyHbaseRpcConnectionHeaderHandler 230 // sent it already 231 established(ch); 232 } else { 233 final Throwable error = future.cause(); 234 scheduleRelogin(error); 235 failInit(ch, toIOE(error)); 236 } 237 } 238 }); 239 } else { 240 // send the connection header to server 241 ch.write(connectionHeaderWithLength.retainedDuplicate()); 242 established(ch); 243 } 244 } else { 245 final Throwable error = future.cause(); 246 scheduleRelogin(error); 247 failInit(ch, toIOE(error)); 248 } 249 } 250 }); 251 } 252 253 private void connect() { 254 LOG.trace("Connecting to {}", remoteId.address); 255 256 this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass) 257 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) 258 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) 259 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) 260 .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) 261 .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { 262 263 @Override 264 public void operationComplete(ChannelFuture future) throws Exception { 265 Channel ch = future.channel(); 266 if (!future.isSuccess()) { 267 failInit(ch, toIOE(future.cause())); 268 rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); 269 return; 270 } 271 ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); 272 if (useSasl) { 273 saslNegotiate(ch); 274 } else { 275 // send the connection header to server 276 ch.write(connectionHeaderWithLength.retainedDuplicate()); 277 established(ch); 278 } 279 } 280 }).channel(); 281 } 282 283 private void write(Channel ch, final Call call) { 284 ch.writeAndFlush(call).addListener(new ChannelFutureListener() { 285 286 @Override 287 public void operationComplete(ChannelFuture future) throws Exception { 288 // Fail the call if we failed to write it out. This usually because the channel is 289 // closed. This is needed because we may shutdown the channel inside event loop and 290 // there may still be some pending calls in the event loop queue after us. 291 if (!future.isSuccess()) { 292 call.setException(toIOE(future.cause())); 293 } 294 } 295 }); 296 } 297 298 @Override 299 public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException { 300 if (reloginInProgress) { 301 throw new IOException("Can not send request because relogin is in progress."); 302 } 303 hrc.notifyOnCancel(new RpcCallback<Object>() { 304 305 @Override 306 public void run(Object parameter) { 307 setCancelled(call); 308 synchronized (this) { 309 if (channel != null) { 310 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); 311 } 312 } 313 } 314 }, new CancellationCallback() { 315 316 @Override 317 public void run(boolean cancelled) throws IOException { 318 if (cancelled) { 319 setCancelled(call); 320 } else { 321 if (channel == null) { 322 connect(); 323 } 324 scheduleTimeoutTask(call); 325 final Channel ch = channel; 326 // We must move the whole writeAndFlush call inside event loop otherwise there will be a 327 // race condition. 328 // In netty's DefaultChannelPipeline, it will find the first outbound handler in the 329 // current thread and then schedule a task to event loop which will start the process from 330 // that outbound handler. It is possible that the first handler is 331 // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set 332 // up at the same time so in the event loop thread we remove the 333 // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the 334 // write method of BufferCallBeforeInitHandler. 335 // This may be considered as a bug of netty, but anyway there is a work around so let's 336 // fix it by ourselves first. 337 if (ch.eventLoop().inEventLoop()) { 338 write(ch, call); 339 } else { 340 ch.eventLoop().execute(new Runnable() { 341 342 @Override 343 public void run() { 344 write(ch, call); 345 } 346 }); 347 } 348 } 349 } 350 }); 351 } 352}