001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import java.io.IOException;
021import java.net.InetAddress;
022import java.security.PrivilegedExceptionAction;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
025import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
026import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
027import org.apache.hadoop.hbase.util.NettyFutureUtils;
028import org.apache.hadoop.security.UserGroupInformation;
029import org.apache.hadoop.security.token.Token;
030import org.apache.hadoop.security.token.TokenIdentifier;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
036import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
037import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
038import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
039import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
040
041/**
042 * Implement SASL logic for netty rpc client.
043 * @since 2.0.0
044 */
045@InterfaceAudience.Private
046public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
047
048  private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
049
050  public static final String HANDLER_NAME = "SaslRpcClientHandler";
051
052  private final Promise<Boolean> saslPromise;
053
054  private final UserGroupInformation ugi;
055
056  private final NettyHBaseSaslRpcClient saslRpcClient;
057
058  private final Configuration conf;
059
060  private final SaslClientAuthenticationProvider provider;
061
062  // flag to mark if Crypto AES encryption is enable
063  private boolean needProcessConnectionHeader = false;
064
065  /**
066   * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
067   *                    simple.
068   */
069  public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
070    SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token,
071    InetAddress serverAddr, String serverPrincipal, boolean fallbackAllowed, Configuration conf)
072    throws IOException {
073    this.saslPromise = saslPromise;
074    this.ugi = ugi;
075    this.conf = conf;
076    this.provider = provider;
077    this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr,
078      serverPrincipal, fallbackAllowed, conf.get("hbase.rpc.protection",
079        SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
080  }
081
082  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
083    LOG.trace("Sending token size={} from initSASLContext.", response.length);
084    NettyFutureUtils.safeWriteAndFlush(ctx,
085      ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
086  }
087
088  private void tryComplete(ChannelHandlerContext ctx) {
089    if (!saslRpcClient.isComplete()) {
090      return;
091    }
092
093    // HBASE-23881 Clearly log when the client thinks that the SASL negotiation is complete.
094    if (LOG.isTraceEnabled()) {
095      LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName());
096    }
097    saslRpcClient.setupSaslHandler(ctx.pipeline(), HANDLER_NAME);
098    removeHandlers(ctx);
099
100    setCryptoAESOption();
101
102    saslPromise.setSuccess(true);
103  }
104
105  private void removeHandlers(ChannelHandlerContext ctx) {
106    ChannelPipeline p = ctx.pipeline();
107    p.remove(SaslChallengeDecoder.class);
108    p.remove(this);
109  }
110
111  private void setCryptoAESOption() {
112    boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop()
113      .equalsIgnoreCase(saslRpcClient.getSaslQOP());
114    needProcessConnectionHeader =
115      saslEncryptionEnabled && conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false);
116  }
117
118  public boolean isNeedProcessConnectionHeader() {
119    return needProcessConnectionHeader;
120  }
121
122  @Override
123  public void handlerAdded(ChannelHandlerContext ctx) {
124    // dispose the saslRpcClient when the channel is closed, since saslRpcClient is final, it is
125    // safe to reference it in lambda expr.
126    NettyFutureUtils.addListener(ctx.channel().closeFuture(), f -> saslRpcClient.dispose());
127    try {
128      byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
129
130        @Override
131        public byte[] run() throws Exception {
132          return saslRpcClient.getInitialResponse();
133        }
134      });
135      assert initialResponse != null;
136      writeResponse(ctx, initialResponse);
137      // HBASE-23881 We do not want to check if the SaslClient thinks the handshake is
138      // complete as, at this point, we've not heard a back from the server with it's reply
139      // to our first challenge response. We should wait for at least one reply
140      // from the server before calling negotiation complete.
141      //
142      // Each SASL mechanism has its own handshake. Some mechanisms calculate a single client buffer
143      // to be sent to the server while others have multiple exchanges to negotiate authentication.
144      // GSSAPI(Kerberos) and DIGEST-MD5 both are examples of mechanisms which have multiple steps.
145      // Mechanisms which have multiple steps will not return true on `SaslClient#isComplete()`
146      // until the handshake has fully completed. Mechanisms which only send a single buffer may
147      // return true on `isComplete()` after that initial response is calculated.
148
149      // HBASE-28337 We still want to check if the SaslClient completed the handshake, because
150      // there are certain mechs like PLAIN which doesn't have a server response after the
151      // initial authentication request. We cannot remove this tryComplete(), otherwise mechs
152      // like PLAIN will fail with call timeout.
153      tryComplete(ctx);
154    } catch (Exception e) {
155      // the exception thrown by handlerAdded will not be passed to the exceptionCaught below
156      // because netty will remove a handler if handlerAdded throws an exception.
157      exceptionCaught(ctx, e);
158    }
159  }
160
161  @Override
162  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
163    int len = msg.readInt();
164    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
165      saslRpcClient.dispose();
166      if (saslRpcClient.fallbackAllowed) {
167        saslPromise.trySuccess(false);
168      } else {
169        saslPromise.tryFailure(new FallbackDisallowedException());
170      }
171      // When we switch to simple auth, we should also remove SaslChallengeDecoder and
172      // NettyHBaseSaslRpcClientHandler.
173      removeHandlers(ctx);
174      return;
175    }
176    LOG.trace("Reading input token size={} for processing by initSASLContext", len);
177    final byte[] challenge = new byte[len];
178    msg.readBytes(challenge);
179    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
180
181      @Override
182      public byte[] run() throws Exception {
183        return saslRpcClient.evaluateChallenge(challenge);
184      }
185    });
186    if (response != null) {
187      writeResponse(ctx, response);
188    } else {
189      LOG.trace("SASL challenge response was empty, not sending response to server.");
190    }
191    tryComplete(ctx);
192  }
193
194  @Override
195  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
196    saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
197    ctx.fireChannelInactive();
198  }
199
200  @Override
201  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
202    saslPromise.tryFailure(cause);
203  }
204}