001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
025
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.net.UnknownHostException;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ScheduledExecutorService;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
034import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
035import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
036import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
037import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
038import org.apache.hadoop.hbase.util.NettyFutureUtils;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
047import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
048import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
050import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
051import org.apache.hbase.thirdparty.io.netty.channel.Channel;
052import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
053import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
054import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
055import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
056import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
057import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
058import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
059import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
060import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
061import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
062import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
063import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
064import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
065
066import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
067
068/**
069 * RPC connection implementation based on netty.
070 * <p/>
071 * Most operations are executed in handlers. Netty handler is always executed in the same
072 * thread(EventLoop) so no lock is needed.
073 * <p/>
074 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
075 * {@link #eventLoop} thread, otherwise there will be races.
076 * @since 2.0.0
077 */
078@InterfaceAudience.Private
079class NettyRpcConnection extends RpcConnection {
080
081  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
082
083  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
084    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
085      .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
086
087  private final NettyRpcClient rpcClient;
088
089  // the event loop used to set up the connection, we will also execute other operations for this
090  // connection in this event loop, to avoid locking everywhere.
091  private final EventLoop eventLoop;
092
093  private ByteBuf connectionHeaderPreamble;
094
095  private ByteBuf connectionHeaderWithLength;
096
097  // make it volatile so in the isActive method below we do not need to switch to the event loop
098  // thread to access this field.
099  private volatile Channel channel;
100
101  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
102    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
103      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
104      rpcClient.metrics);
105    this.rpcClient = rpcClient;
106    this.eventLoop = rpcClient.group.next();
107    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
108    this.connectionHeaderPreamble =
109      Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
110    ConnectionHeader header = getConnectionHeader();
111    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
112    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
113    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
114  }
115
116  @Override
117  protected void callTimeout(Call call) {
118    execute(eventLoop, () -> {
119      if (channel != null) {
120        channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
121      }
122    });
123  }
124
125  @Override
126  public boolean isActive() {
127    return channel != null;
128  }
129
130  private void shutdown0() {
131    assert eventLoop.inEventLoop();
132    if (channel != null) {
133      NettyFutureUtils.consume(channel.close());
134      channel = null;
135    }
136  }
137
138  @Override
139  public void shutdown() {
140    execute(eventLoop, this::shutdown0);
141  }
142
143  @Override
144  public void cleanupConnection() {
145    execute(eventLoop, () -> {
146      if (connectionHeaderPreamble != null) {
147        ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
148        connectionHeaderPreamble = null;
149      }
150      if (connectionHeaderWithLength != null) {
151        ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
152        connectionHeaderWithLength = null;
153      }
154    });
155  }
156
157  private void established(Channel ch) throws IOException {
158    assert eventLoop.inEventLoop();
159    ch.pipeline()
160      .addBefore(BufferCallBeforeInitHandler.NAME, null,
161        new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
162      .addBefore(BufferCallBeforeInitHandler.NAME, null,
163        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
164      .addBefore(BufferCallBeforeInitHandler.NAME, null,
165        new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
166      .fireUserEventTriggered(BufferCallEvent.success());
167  }
168
169  private boolean reloginInProgress;
170
171  @SuppressWarnings("FutureReturnValueIgnored")
172  private void scheduleRelogin(Throwable error) {
173    assert eventLoop.inEventLoop();
174    if (error instanceof FallbackDisallowedException) {
175      return;
176    }
177    if (!provider.canRetry()) {
178      LOG.trace("SASL Provider does not support retries");
179      return;
180    }
181    if (reloginInProgress) {
182      return;
183    }
184    reloginInProgress = true;
185    RELOGIN_EXECUTOR.schedule(() -> {
186      try {
187        provider.relogin();
188      } catch (IOException e) {
189        LOG.warn("Relogin failed", e);
190      }
191      eventLoop.execute(() -> {
192        reloginInProgress = false;
193      });
194    }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
195  }
196
197  private void failInit(Channel ch, IOException e) {
198    assert eventLoop.inEventLoop();
199    // fail all pending calls
200    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
201    shutdown0();
202  }
203
204  private void saslNegotiate(final Channel ch) {
205    assert eventLoop.inEventLoop();
206    UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
207    if (ticket == null) {
208      failInit(ch, new FatalConnectionException("ticket/user is null"));
209      return;
210    }
211    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
212    final NettyHBaseSaslRpcClientHandler saslHandler;
213    try {
214      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
215        ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
216        rpcClient.fallbackAllowed, this.rpcClient.conf);
217    } catch (IOException e) {
218      failInit(ch, e);
219      return;
220    }
221    ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
222      .addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
223    NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
224
225      @Override
226      public void operationComplete(Future<Boolean> future) throws Exception {
227        if (future.isSuccess()) {
228          ChannelPipeline p = ch.pipeline();
229          p.remove(SaslChallengeDecoder.class);
230          p.remove(NettyHBaseSaslRpcClientHandler.class);
231
232          // check if negotiate with server for connection header is necessary
233          if (saslHandler.isNeedProcessConnectionHeader()) {
234            Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
235            // create the handler to handle the connection header
236            NettyHBaseRpcConnectionHeaderHandler chHandler =
237              new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
238                connectionHeaderWithLength);
239
240            // add ReadTimeoutHandler to deal with server doesn't response connection header
241            // because of the different configuration in client side and server side
242            final String readTimeoutHandlerName = "ReadTimeout";
243            p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
244              new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS))
245              .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
246            NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() {
247              @Override
248              public void operationComplete(Future<Boolean> future) throws Exception {
249                if (future.isSuccess()) {
250                  ChannelPipeline p = ch.pipeline();
251                  p.remove(readTimeoutHandlerName);
252                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
253                  // don't send connection header, NettyHBaseRpcConnectionHeaderHandler
254                  // sent it already
255                  established(ch);
256                } else {
257                  final Throwable error = future.cause();
258                  scheduleRelogin(error);
259                  failInit(ch, toIOE(error));
260                }
261              }
262            });
263          } else {
264            // send the connection header to server
265            NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate());
266            established(ch);
267          }
268        } else {
269          final Throwable error = future.cause();
270          scheduleRelogin(error);
271          failInit(ch, toIOE(error));
272        }
273      }
274    });
275  }
276
277  private void connect() throws UnknownHostException {
278    assert eventLoop.inEventLoop();
279    LOG.trace("Connecting to {}", remoteId.getAddress());
280    InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
281    this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
282      .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
283      .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
284      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
285      .handler(new ChannelInitializer<Channel>() {
286
287        @Override
288        protected void initChannel(Channel ch) throws Exception {
289          ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
290            new BufferCallBeforeInitHandler());
291        }
292      }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
293      .addListener(new ChannelFutureListener() {
294
295        @Override
296        public void operationComplete(ChannelFuture future) throws Exception {
297          Channel ch = future.channel();
298          if (!future.isSuccess()) {
299            failInit(ch, toIOE(future.cause()));
300            rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause());
301            return;
302          }
303          NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate());
304          if (useSasl) {
305            saslNegotiate(ch);
306          } else {
307            // send the connection header to server
308            NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate());
309            established(ch);
310          }
311        }
312      }).channel();
313  }
314
315  private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
316    assert eventLoop.inEventLoop();
317    if (reloginInProgress) {
318      throw new IOException("Can not send request because relogin is in progress.");
319    }
320    hrc.notifyOnCancel(new RpcCallback<Object>() {
321
322      @Override
323      public void run(Object parameter) {
324        setCancelled(call);
325        if (channel != null) {
326          channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
327        }
328      }
329    }, new CancellationCallback() {
330
331      @Override
332      public void run(boolean cancelled) throws IOException {
333        if (cancelled) {
334          setCancelled(call);
335        } else {
336          if (channel == null) {
337            connect();
338          }
339          scheduleTimeoutTask(call);
340          NettyFutureUtils.addListener(channel.writeAndFlush(call), new ChannelFutureListener() {
341            @Override
342            public void operationComplete(ChannelFuture future) throws Exception {
343              // Fail the call if we failed to write it out. This usually because the channel is
344              // closed. This is needed because we may shutdown the channel inside event loop and
345              // there may still be some pending calls in the event loop queue after us.
346              if (!future.isSuccess()) {
347                call.setException(toIOE(future.cause()));
348              }
349            }
350          });
351        }
352      }
353    });
354  }
355
356  @Override
357  public void sendRequest(final Call call, HBaseRpcController hrc) {
358    execute(eventLoop, () -> {
359      try {
360        sendRequest0(call, hrc);
361      } catch (Exception e) {
362        call.setException(toIOE(e));
363      }
364    });
365  }
366}