001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
025
026import java.io.IOException;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
032import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
033import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
034import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
035import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.security.UserGroupInformation;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
043import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
044import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
046import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
047import org.apache.hbase.thirdparty.io.netty.channel.Channel;
048import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
049import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
050import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
051import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
052import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
053import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
054import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
055import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
056import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
057import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
058import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
059import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
060import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
063
064/**
065 * RPC connection implementation based on netty.
066 * <p/>
067 * Most operations are executed in handlers. Netty handler is always executed in the same
068 * thread(EventLoop) so no lock is needed.
069 * <p/>
070 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
071 * {@link #eventLoop} thread, otherwise there will be races.
072 * @since 2.0.0
073 */
074@InterfaceAudience.Private
075class NettyRpcConnection extends RpcConnection {
076
077  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
078
079  private static final ScheduledExecutorService RELOGIN_EXECUTOR =
080    Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
081
082  private final NettyRpcClient rpcClient;
083
084  // the event loop used to set up the connection, we will also execute other operations for this
085  // connection in this event loop, to avoid locking everywhere.
086  private final EventLoop eventLoop;
087
088  private ByteBuf connectionHeaderPreamble;
089
090  private ByteBuf connectionHeaderWithLength;
091
092  // make it volatile so in the isActive method below we do not need to switch to the event loop
093  // thread to access this field.
094  private volatile Channel channel;
095
096  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
097    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
098      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
099    this.rpcClient = rpcClient;
100    this.eventLoop = rpcClient.group.next();
101    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
102    this.connectionHeaderPreamble =
103      Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
104    ConnectionHeader header = getConnectionHeader();
105    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
106    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
107    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
108  }
109
110  @Override
111  protected void callTimeout(Call call) {
112    execute(eventLoop, () -> {
113      if (channel != null) {
114        channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
115      }
116    });
117  }
118
119  @Override
120  public boolean isActive() {
121    return channel != null;
122  }
123
124  private void shutdown0() {
125    assert eventLoop.inEventLoop();
126    if (channel != null) {
127      channel.close();
128      channel = null;
129    }
130  }
131
132  @Override
133  public void shutdown() {
134    execute(eventLoop, this::shutdown0);
135  }
136
137  @Override
138  public void cleanupConnection() {
139    execute(eventLoop, () -> {
140      if (connectionHeaderPreamble != null) {
141        ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
142        connectionHeaderPreamble = null;
143      }
144      if (connectionHeaderWithLength != null) {
145        ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
146        connectionHeaderWithLength = null;
147      }
148    });
149  }
150
151  private void established(Channel ch) throws IOException {
152    assert eventLoop.inEventLoop();
153    ChannelPipeline p = ch.pipeline();
154    String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
155    p.addBefore(addBeforeHandler, null,
156      new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
157    p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
158    p.addBefore(addBeforeHandler, null,
159      new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
160    p.fireUserEventTriggered(BufferCallEvent.success());
161  }
162
163  private boolean reloginInProgress;
164
165  private void scheduleRelogin(Throwable error) {
166    assert eventLoop.inEventLoop();
167    if (error instanceof FallbackDisallowedException) {
168      return;
169    }
170    if (!provider.canRetry()) {
171      LOG.trace("SASL Provider does not support retries");
172      return;
173    }
174    if (reloginInProgress) {
175      return;
176    }
177    reloginInProgress = true;
178    RELOGIN_EXECUTOR.schedule(() -> {
179      try {
180        provider.relogin();
181      } catch (IOException e) {
182        LOG.warn("Relogin failed", e);
183      }
184      eventLoop.execute(() -> {
185        reloginInProgress = false;
186      });
187    }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
188  }
189
190  private void failInit(Channel ch, IOException e) {
191    assert eventLoop.inEventLoop();
192    // fail all pending calls
193    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
194    shutdown0();
195  }
196
197  private void saslNegotiate(final Channel ch) {
198    assert eventLoop.inEventLoop();
199    UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
200    if (ticket == null) {
201      failInit(ch, new FatalConnectionException("ticket/user is null"));
202      return;
203    }
204    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
205    final NettyHBaseSaslRpcClientHandler saslHandler;
206    try {
207      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
208        serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
209    } catch (IOException e) {
210      failInit(ch, e);
211      return;
212    }
213    ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
214    saslPromise.addListener(new FutureListener<Boolean>() {
215
216      @Override
217      public void operationComplete(Future<Boolean> future) throws Exception {
218        if (future.isSuccess()) {
219          ChannelPipeline p = ch.pipeline();
220          p.remove(SaslChallengeDecoder.class);
221          p.remove(NettyHBaseSaslRpcClientHandler.class);
222
223          // check if negotiate with server for connection header is necessary
224          if (saslHandler.isNeedProcessConnectionHeader()) {
225            Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
226            // create the handler to handle the connection header
227            ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
228              connectionHeaderPromise, conf, connectionHeaderWithLength);
229
230            // add ReadTimeoutHandler to deal with server doesn't response connection header
231            // because of the different configuration in client side and server side
232            p.addFirst(
233              new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
234            p.addLast(chHandler);
235            connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
236              @Override
237              public void operationComplete(Future<Boolean> future) throws Exception {
238                if (future.isSuccess()) {
239                  ChannelPipeline p = ch.pipeline();
240                  p.remove(ReadTimeoutHandler.class);
241                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
242                  // don't send connection header, NettyHbaseRpcConnectionHeaderHandler
243                  // sent it already
244                  established(ch);
245                } else {
246                  final Throwable error = future.cause();
247                  scheduleRelogin(error);
248                  failInit(ch, toIOE(error));
249                }
250              }
251            });
252          } else {
253            // send the connection header to server
254            ch.write(connectionHeaderWithLength.retainedDuplicate());
255            established(ch);
256          }
257        } else {
258          final Throwable error = future.cause();
259          scheduleRelogin(error);
260          failInit(ch, toIOE(error));
261        }
262      }
263    });
264  }
265
266  private void connect() {
267    assert eventLoop.inEventLoop();
268    LOG.trace("Connecting to {}", remoteId.address);
269
270    this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
271      .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
272      .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
273      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
274      .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
275      .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
276
277        @Override
278        public void operationComplete(ChannelFuture future) throws Exception {
279          Channel ch = future.channel();
280          if (!future.isSuccess()) {
281            failInit(ch, toIOE(future.cause()));
282            rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
283            return;
284          }
285          ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
286          if (useSasl) {
287            saslNegotiate(ch);
288          } else {
289            // send the connection header to server
290            ch.write(connectionHeaderWithLength.retainedDuplicate());
291            established(ch);
292          }
293        }
294      }).channel();
295  }
296
297  private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
298    assert eventLoop.inEventLoop();
299    if (reloginInProgress) {
300      throw new IOException("Can not send request because relogin is in progress.");
301    }
302    hrc.notifyOnCancel(new RpcCallback<Object>() {
303
304      @Override
305      public void run(Object parameter) {
306        setCancelled(call);
307        if (channel != null) {
308          channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
309        }
310      }
311    }, new CancellationCallback() {
312
313      @Override
314      public void run(boolean cancelled) throws IOException {
315        if (cancelled) {
316          setCancelled(call);
317        } else {
318          if (channel == null) {
319            connect();
320          }
321          scheduleTimeoutTask(call);
322          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
323
324            @Override
325            public void operationComplete(ChannelFuture future) throws Exception {
326              // Fail the call if we failed to write it out. This usually because the channel is
327              // closed. This is needed because we may shutdown the channel inside event loop and
328              // there may still be some pending calls in the event loop queue after us.
329              if (!future.isSuccess()) {
330                call.setException(toIOE(future.cause()));
331              }
332            }
333          });
334        }
335      }
336    });
337  }
338
339  @Override
340  public void sendRequest(final Call call, HBaseRpcController hrc) {
341    execute(eventLoop, () -> {
342      try {
343        sendRequest0(call, hrc);
344      } catch (Exception e) {
345        call.setException(toIOE(e));
346      }
347    });
348  }
349}