001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
025
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.net.UnknownHostException;
029import java.util.Set;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.TimeUnit;
034import javax.security.sasl.SaslException;
035import org.apache.hadoop.hbase.client.ConnectionUtils;
036import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
037import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
039import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
040import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
041import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
042import org.apache.hadoop.hbase.util.NettyFutureUtils;
043import org.apache.hadoop.hbase.util.Threads;
044import org.apache.hadoop.security.UserGroupInformation;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
051import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
052import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
053import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
054import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
055import org.apache.hbase.thirdparty.io.netty.channel.Channel;
056import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
060import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
061import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
062import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
063import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
064import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
065import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
066import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
067import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
068import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
069import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
070import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
073
074/**
075 * RPC connection implementation based on netty.
076 * <p/>
077 * Most operations are executed in handlers. Netty handler is always executed in the same
078 * thread(EventLoop) so no lock is needed.
079 * <p/>
080 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
081 * {@link #eventLoop} thread, otherwise there will be races.
082 * @since 2.0.0
083 */
084@InterfaceAudience.Private
085class NettyRpcConnection extends RpcConnection {
086
087  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
088
089  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
090    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
091      .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
092
093  private final NettyRpcClient rpcClient;
094
095  // the event loop used to set up the connection, we will also execute other operations for this
096  // connection in this event loop, to avoid locking everywhere.
097  private final EventLoop eventLoop;
098
099  private ByteBuf connectionHeaderPreamble;
100
101  private ByteBuf connectionHeaderWithLength;
102
103  // make it volatile so in the isActive method below we do not need to switch to the event loop
104  // thread to access this field.
105  private volatile Channel channel;
106
107  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
108    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
109      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
110      rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes);
111    this.rpcClient = rpcClient;
112    this.eventLoop = rpcClient.group.next();
113    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
114    this.connectionHeaderPreamble =
115      Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
116    ConnectionHeader header = getConnectionHeader();
117    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
118    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
119    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
120  }
121
122  @Override
123  protected void callTimeout(Call call) {
124    execute(eventLoop, () -> {
125      if (channel != null) {
126        channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
127      }
128    });
129  }
130
131  @Override
132  public boolean isActive() {
133    return channel != null;
134  }
135
136  private void shutdown0() {
137    assert eventLoop.inEventLoop();
138    if (channel != null) {
139      channel.close();
140      channel = null;
141    }
142  }
143
144  @Override
145  public void shutdown() {
146    execute(eventLoop, this::shutdown0);
147  }
148
149  @Override
150  public void cleanupConnection() {
151    execute(eventLoop, () -> {
152      if (connectionHeaderPreamble != null) {
153        ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
154        connectionHeaderPreamble = null;
155      }
156      if (connectionHeaderWithLength != null) {
157        ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
158        connectionHeaderWithLength = null;
159      }
160    });
161  }
162
163  private void established(Channel ch) {
164    assert eventLoop.inEventLoop();
165    ch.pipeline()
166      .addBefore(BufferCallBeforeInitHandler.NAME, null,
167        new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
168      .addBefore(BufferCallBeforeInitHandler.NAME, null,
169        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
170      .addBefore(BufferCallBeforeInitHandler.NAME, null,
171        new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
172      .fireUserEventTriggered(BufferCallEvent.success());
173  }
174
175  private void saslEstablished(Channel ch, String serverPrincipal) {
176    saslNegotiationDone(serverPrincipal, true);
177    established(ch);
178  }
179
180  private boolean reloginInProgress;
181
182  private void scheduleRelogin(Throwable error) {
183    assert eventLoop.inEventLoop();
184    if (error instanceof FallbackDisallowedException) {
185      return;
186    }
187    if (!provider.canRetry()) {
188      LOG.trace("SASL Provider does not support retries");
189      return;
190    }
191    if (reloginInProgress) {
192      return;
193    }
194    reloginInProgress = true;
195    RELOGIN_EXECUTOR.schedule(() -> {
196      try {
197        provider.relogin();
198      } catch (IOException e) {
199        LOG.warn("Relogin failed", e);
200      }
201      eventLoop.execute(() -> {
202        reloginInProgress = false;
203      });
204    }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
205  }
206
207  private void failInit(Channel ch, IOException e) {
208    assert eventLoop.inEventLoop();
209    // fail all pending calls
210    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
211    shutdown0();
212    rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), e);
213  }
214
215  private void saslFailInit(Channel ch, String serverPrincipal, IOException error) {
216    assert eventLoop.inEventLoop();
217    saslNegotiationDone(serverPrincipal, false);
218    failInit(ch, error);
219  }
220
221  private void saslNegotiate(Channel ch, String serverPrincipal) {
222    assert eventLoop.inEventLoop();
223    NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate());
224    UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
225    if (ticket == null) {
226      saslFailInit(ch, serverPrincipal, new FatalConnectionException("ticket/user is null"));
227      return;
228    }
229    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
230    final NettyHBaseSaslRpcClientHandler saslHandler;
231    try {
232      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
233        ((InetSocketAddress) ch.remoteAddress()).getAddress(), serverPrincipal,
234        rpcClient.fallbackAllowed, this.rpcClient.conf);
235    } catch (IOException e) {
236      saslFailInit(ch, serverPrincipal, e);
237      return;
238    }
239    ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
240      .addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME,
241        saslHandler);
242    NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
243
244      @Override
245      public void operationComplete(Future<Boolean> future) throws Exception {
246        if (future.isSuccess()) {
247          ChannelPipeline p = ch.pipeline();
248          // check if negotiate with server for connection header is necessary
249          if (saslHandler.isNeedProcessConnectionHeader()) {
250            Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
251            // create the handler to handle the connection header
252            NettyHBaseRpcConnectionHeaderHandler chHandler =
253              new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
254                connectionHeaderWithLength);
255
256            // add ReadTimeoutHandler to deal with server doesn't response connection header
257            // because of the different configuration in client side and server side
258            final String readTimeoutHandlerName = "ReadTimeout";
259            p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
260              new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS))
261              .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
262            NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() {
263              @Override
264              public void operationComplete(Future<Boolean> future) throws Exception {
265                if (future.isSuccess()) {
266                  ChannelPipeline p = ch.pipeline();
267                  p.remove(readTimeoutHandlerName);
268                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
269                  // don't send connection header, NettyHBaseRpcConnectionHeaderHandler
270                  // sent it already
271                  saslEstablished(ch, serverPrincipal);
272                } else {
273                  final Throwable error = future.cause();
274                  scheduleRelogin(error);
275                  saslFailInit(ch, serverPrincipal, toIOE(error));
276                }
277              }
278            });
279          } else {
280            // send the connection header to server
281            ch.write(connectionHeaderWithLength.retainedDuplicate());
282            saslEstablished(ch, serverPrincipal);
283          }
284        } else {
285          final Throwable error = future.cause();
286          scheduleRelogin(error);
287          saslFailInit(ch, serverPrincipal, toIOE(error));
288        }
289      }
290    });
291  }
292
293  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
294    assert eventLoop.inEventLoop();
295    PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
296      RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
297  }
298
299  private void onSecurityPreambleError(Channel ch, Set<String> serverPrincipals,
300    IOException error) {
301    assert eventLoop.inEventLoop();
302    LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, error);
303    if (ConnectionUtils.isUnexpectedPreambleHeaderException(error)) {
304      // this means we are connecting to an old server which does not support the security
305      // preamble call, so we should fallback to randomly select a principal to use
306      // TODO: find a way to reconnect without failing all the pending calls, for now, when we
307      // reach here, shutdown should have already been scheduled
308      return;
309    }
310    if (IPCUtil.isSecurityNotEnabledException(error)) {
311      // server tells us security is not enabled, then we should check whether fallback to
312      // simple is allowed, if so we just go without security, otherwise we should fail the
313      // negotiation immediately
314      if (rpcClient.fallbackAllowed) {
315        // TODO: just change the preamble and skip the fallback to simple logic, for now, just
316        // select the first principal can finish the connection setup, but waste one client
317        // message
318        saslNegotiate(ch, serverPrincipals.iterator().next());
319      } else {
320        failInit(ch, new FallbackDisallowedException());
321      }
322      return;
323    }
324    // usually we should not reach here, but for robust, just randomly select a principal to
325    // connect
326    saslNegotiate(ch, randomSelect(serverPrincipals));
327  }
328
329  private void onSecurityPreambleFinish(Channel ch, Set<String> serverPrincipals,
330    Call securityPreambleCall) {
331    assert eventLoop.inEventLoop();
332    String serverPrincipal;
333    try {
334      serverPrincipal = chooseServerPrincipal(serverPrincipals, securityPreambleCall);
335    } catch (SaslException e) {
336      failInit(ch, e);
337      return;
338    }
339    saslNegotiate(ch, serverPrincipal);
340  }
341
342  private void saslNegotiate(Channel ch) throws IOException {
343    assert eventLoop.inEventLoop();
344    Set<String> serverPrincipals = getServerPrincipals();
345    if (serverPrincipals.size() == 1) {
346      saslNegotiate(ch, serverPrincipals.iterator().next());
347      return;
348    }
349    // this means we use kerberos authentication and there are multiple server principal candidates,
350    // in this way we need to send a special preamble header to get server principal from server
351    Call securityPreambleCall = createSecurityPreambleCall(call -> {
352      assert eventLoop.inEventLoop();
353      if (call.error != null) {
354        onSecurityPreambleError(ch, serverPrincipals, call.error);
355      } else {
356        onSecurityPreambleFinish(ch, serverPrincipals, call);
357      }
358    });
359    PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
360      RpcClient.SECURITY_PREAMBLE_HEADER, securityPreambleCall);
361  }
362
363  private void connect(Call connectionRegistryCall) throws UnknownHostException {
364    assert eventLoop.inEventLoop();
365    LOG.trace("Connecting to {}", remoteId.getAddress());
366    InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
367    this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
368      .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
369      .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
370      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
371      .handler(new ChannelInitializer<Channel>() {
372        @Override
373        protected void initChannel(Channel ch) throws Exception {
374          if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {
375            SslContext sslContext = rpcClient.getSslContext();
376            SslHandler sslHandler = sslContext.newHandler(ch.alloc(),
377              remoteId.address.getHostName(), remoteId.address.getPort());
378            sslHandler.setHandshakeTimeoutMillis(
379              conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT,
380                X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS));
381            ch.pipeline().addFirst(sslHandler);
382            LOG.debug("SSL handler added with handshake timeout {} ms",
383              sslHandler.getHandshakeTimeoutMillis());
384          }
385          ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
386            new BufferCallBeforeInitHandler());
387        }
388      }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
389      .addListener(new ChannelFutureListener() {
390
391        private void succeed(Channel ch) throws IOException {
392          if (connectionRegistryCall != null) {
393            getConnectionRegistry(ch, connectionRegistryCall);
394            return;
395          }
396          if (!useSasl) {
397            // BufferCallBeforeInitHandler will call ctx.flush when receiving the
398            // BufferCallEvent.success() event, so here we just use write for the below two messages
399            NettyFutureUtils.safeWrite(ch, connectionHeaderPreamble.retainedDuplicate());
400            NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate());
401            established(ch);
402          } else {
403            saslNegotiate(ch);
404          }
405        }
406
407        private void fail(Channel ch, Throwable error) {
408          IOException ex = toIOE(error);
409          LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(),
410            ex);
411          if (connectionRegistryCall != null) {
412            connectionRegistryCall.setException(ex);
413          }
414          failInit(ch, ex);
415        }
416
417        @Override
418        public void operationComplete(ChannelFuture future) throws Exception {
419          Channel ch = future.channel();
420          if (!future.isSuccess()) {
421            fail(ch, future.cause());
422            return;
423          }
424          SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
425          if (sslHandler != null) {
426            NettyFutureUtils.addListener(sslHandler.handshakeFuture(), f -> {
427              if (f.isSuccess()) {
428                succeed(ch);
429              } else {
430                fail(ch, f.cause());
431              }
432            });
433          } else {
434            succeed(ch);
435          }
436        }
437      }).channel();
438  }
439
440  private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
441    assert eventLoop.inEventLoop();
442    if (call.isConnectionRegistryCall()) {
443      // For get connection registry call, we will send a special preamble header to get the
444      // response, instead of sending a real rpc call. See HBASE-25051
445      connect(call);
446      return;
447    }
448    if (reloginInProgress) {
449      throw new IOException(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS);
450    }
451    hrc.notifyOnCancel(new RpcCallback<Object>() {
452
453      @Override
454      public void run(Object parameter) {
455        setCancelled(call);
456        if (channel != null) {
457          channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
458        }
459      }
460    }, new CancellationCallback() {
461
462      @Override
463      public void run(boolean cancelled) throws IOException {
464        if (cancelled) {
465          setCancelled(call);
466        } else {
467          if (channel == null) {
468            connect(null);
469          }
470          scheduleTimeoutTask(call);
471          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
472
473            @Override
474            public void operationComplete(ChannelFuture future) throws Exception {
475              // Fail the call if we failed to write it out. This usually because the channel is
476              // closed. This is needed because we may shutdown the channel inside event loop and
477              // there may still be some pending calls in the event loop queue after us.
478              if (!future.isSuccess()) {
479                call.setException(toIOE(future.cause()));
480              }
481            }
482          });
483        }
484      }
485    });
486  }
487
488  @Override
489  public void sendRequest(final Call call, HBaseRpcController hrc) {
490    execute(eventLoop, () -> {
491      try {
492        sendRequest0(call, hrc);
493      } catch (Exception e) {
494        call.setException(toIOE(e));
495      }
496    });
497  }
498}