001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
025
026import java.io.IOException;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
032import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
033import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
034import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
035import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.security.UserGroupInformation;
038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
044import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
046import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
047import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
048import org.apache.hbase.thirdparty.io.netty.channel.Channel;
049import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
050import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
051import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
052import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
053import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
054import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
055import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
056import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
057import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
058import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
059import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
060import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
061import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
064
065/**
066 * RPC connection implementation based on netty.
067 * <p/>
068 * Most operations are executed in handlers. Netty handler is always executed in the same
069 * thread(EventLoop) so no lock is needed.
070 * <p/>
071 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
072 * {@link #eventLoop} thread, otherwise there will be races.
073 * @since 2.0.0
074 */
075@InterfaceAudience.Private
076class NettyRpcConnection extends RpcConnection {
077
078  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
079
080  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
081    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
082      .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
083
084  private final NettyRpcClient rpcClient;
085
086  // the event loop used to set up the connection, we will also execute other operations for this
087  // connection in this event loop, to avoid locking everywhere.
088  private final EventLoop eventLoop;
089
090  private ByteBuf connectionHeaderPreamble;
091
092  private ByteBuf connectionHeaderWithLength;
093
094  // make it volatile so in the isActive method below we do not need to switch to the event loop
095  // thread to access this field.
096  private volatile Channel channel;
097
098  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
099    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
100      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
101    this.rpcClient = rpcClient;
102    this.eventLoop = rpcClient.group.next();
103    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
104    this.connectionHeaderPreamble =
105      Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
106    ConnectionHeader header = getConnectionHeader();
107    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
108    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
109    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
110  }
111
112  @Override
113  protected void callTimeout(Call call) {
114    execute(eventLoop, () -> {
115      if (channel != null) {
116        channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
117      }
118    });
119  }
120
121  @Override
122  public boolean isActive() {
123    return channel != null;
124  }
125
126  private void shutdown0() {
127    assert eventLoop.inEventLoop();
128    if (channel != null) {
129      channel.close();
130      channel = null;
131    }
132  }
133
134  @Override
135  public void shutdown() {
136    execute(eventLoop, this::shutdown0);
137  }
138
139  @Override
140  public void cleanupConnection() {
141    execute(eventLoop, () -> {
142      if (connectionHeaderPreamble != null) {
143        ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
144        connectionHeaderPreamble = null;
145      }
146      if (connectionHeaderWithLength != null) {
147        ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
148        connectionHeaderWithLength = null;
149      }
150    });
151  }
152
153  private void established(Channel ch) throws IOException {
154    assert eventLoop.inEventLoop();
155    ChannelPipeline p = ch.pipeline();
156    String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
157    p.addBefore(addBeforeHandler, null,
158      new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
159    p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
160    p.addBefore(addBeforeHandler, null,
161      new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
162    p.fireUserEventTriggered(BufferCallEvent.success());
163  }
164
165  private boolean reloginInProgress;
166
167  private void scheduleRelogin(Throwable error) {
168    assert eventLoop.inEventLoop();
169    if (error instanceof FallbackDisallowedException) {
170      return;
171    }
172    if (!provider.canRetry()) {
173      LOG.trace("SASL Provider does not support retries");
174      return;
175    }
176    if (reloginInProgress) {
177      return;
178    }
179    reloginInProgress = true;
180    RELOGIN_EXECUTOR.schedule(() -> {
181      try {
182        provider.relogin();
183      } catch (IOException e) {
184        LOG.warn("Relogin failed", e);
185      }
186      eventLoop.execute(() -> {
187        reloginInProgress = false;
188      });
189    }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
190  }
191
192  private void failInit(Channel ch, IOException e) {
193    assert eventLoop.inEventLoop();
194    // fail all pending calls
195    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
196    shutdown0();
197  }
198
199  private void saslNegotiate(final Channel ch) {
200    assert eventLoop.inEventLoop();
201    UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
202    if (ticket == null) {
203      failInit(ch, new FatalConnectionException("ticket/user is null"));
204      return;
205    }
206    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
207    final NettyHBaseSaslRpcClientHandler saslHandler;
208    try {
209      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
210        serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
211    } catch (IOException e) {
212      failInit(ch, e);
213      return;
214    }
215    ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
216    saslPromise.addListener(new FutureListener<Boolean>() {
217
218      @Override
219      public void operationComplete(Future<Boolean> future) throws Exception {
220        if (future.isSuccess()) {
221          ChannelPipeline p = ch.pipeline();
222          p.remove(SaslChallengeDecoder.class);
223          p.remove(NettyHBaseSaslRpcClientHandler.class);
224
225          // check if negotiate with server for connection header is necessary
226          if (saslHandler.isNeedProcessConnectionHeader()) {
227            Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
228            // create the handler to handle the connection header
229            ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
230              connectionHeaderPromise, conf, connectionHeaderWithLength);
231
232            // add ReadTimeoutHandler to deal with server doesn't response connection header
233            // because of the different configuration in client side and server side
234            p.addFirst(
235              new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
236            p.addLast(chHandler);
237            connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
238              @Override
239              public void operationComplete(Future<Boolean> future) throws Exception {
240                if (future.isSuccess()) {
241                  ChannelPipeline p = ch.pipeline();
242                  p.remove(ReadTimeoutHandler.class);
243                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
244                  // don't send connection header, NettyHbaseRpcConnectionHeaderHandler
245                  // sent it already
246                  established(ch);
247                } else {
248                  final Throwable error = future.cause();
249                  scheduleRelogin(error);
250                  failInit(ch, toIOE(error));
251                }
252              }
253            });
254          } else {
255            // send the connection header to server
256            ch.write(connectionHeaderWithLength.retainedDuplicate());
257            established(ch);
258          }
259        } else {
260          final Throwable error = future.cause();
261          scheduleRelogin(error);
262          failInit(ch, toIOE(error));
263        }
264      }
265    });
266  }
267
268  private void connect() {
269    assert eventLoop.inEventLoop();
270    LOG.trace("Connecting to {}", remoteId.address);
271
272    this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
273      .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
274      .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
275      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
276      .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
277      .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
278
279        @Override
280        public void operationComplete(ChannelFuture future) throws Exception {
281          Channel ch = future.channel();
282          if (!future.isSuccess()) {
283            failInit(ch, toIOE(future.cause()));
284            rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
285            return;
286          }
287          ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
288          if (useSasl) {
289            saslNegotiate(ch);
290          } else {
291            // send the connection header to server
292            ch.write(connectionHeaderWithLength.retainedDuplicate());
293            established(ch);
294          }
295        }
296      }).channel();
297  }
298
299  private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
300    assert eventLoop.inEventLoop();
301    if (reloginInProgress) {
302      throw new IOException("Can not send request because relogin is in progress.");
303    }
304    hrc.notifyOnCancel(new RpcCallback<Object>() {
305
306      @Override
307      public void run(Object parameter) {
308        setCancelled(call);
309        if (channel != null) {
310          channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
311        }
312      }
313    }, new CancellationCallback() {
314
315      @Override
316      public void run(boolean cancelled) throws IOException {
317        if (cancelled) {
318          setCancelled(call);
319        } else {
320          if (channel == null) {
321            connect();
322          }
323          scheduleTimeoutTask(call);
324          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
325
326            @Override
327            public void operationComplete(ChannelFuture future) throws Exception {
328              // Fail the call if we failed to write it out. This usually because the channel is
329              // closed. This is needed because we may shutdown the channel inside event loop and
330              // there may still be some pending calls in the event loop queue after us.
331              if (!future.isSuccess()) {
332                call.setException(toIOE(future.cause()));
333              }
334            }
335          });
336        }
337      }
338    });
339  }
340
341  @Override
342  public void sendRequest(final Call call, HBaseRpcController hrc) {
343    execute(eventLoop, () -> {
344      try {
345        sendRequest0(call, hrc);
346      } catch (Exception e) {
347        call.setException(toIOE(e));
348      }
349    });
350  }
351}