001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import java.io.IOException;
021import java.net.InetAddress;
022import java.security.PrivilegedExceptionAction;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
025import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
026import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
027import org.apache.hadoop.security.UserGroupInformation;
028import org.apache.hadoop.security.token.Token;
029import org.apache.hadoop.security.token.TokenIdentifier;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
035import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
036import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
037import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
038
039/**
040 * Implement SASL logic for netty rpc client.
041 * @since 2.0.0
042 */
043@InterfaceAudience.Private
044public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
045
046  private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
047
048  private final Promise<Boolean> saslPromise;
049
050  private final UserGroupInformation ugi;
051
052  private final NettyHBaseSaslRpcClient saslRpcClient;
053
054  private final Configuration conf;
055
056  private final SaslClientAuthenticationProvider provider;
057
058  // flag to mark if Crypto AES encryption is enable
059  private boolean needProcessConnectionHeader = false;
060
061  /**
062   * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
063   *                    simple.
064   */
065  public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
066    SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token,
067    InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed, Configuration conf)
068    throws IOException {
069    this.saslPromise = saslPromise;
070    this.ugi = ugi;
071    this.conf = conf;
072    this.provider = provider;
073    this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr,
074      securityInfo, fallbackAllowed, conf.get("hbase.rpc.protection",
075        SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
076  }
077
078  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
079    LOG.trace("Sending token size={} from initSASLContext.", response.length);
080    ctx.writeAndFlush(
081      ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
082  }
083
084  private void tryComplete(ChannelHandlerContext ctx) {
085    if (!saslRpcClient.isComplete()) {
086      return;
087    }
088
089    // HBASE-23881 Clearly log when the client thinks that the SASL negotiation is complete.
090    if (LOG.isTraceEnabled()) {
091      LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName());
092    }
093
094    saslRpcClient.setupSaslHandler(ctx.pipeline());
095    setCryptoAESOption();
096
097    saslPromise.setSuccess(true);
098  }
099
100  private void setCryptoAESOption() {
101    boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop()
102      .equalsIgnoreCase(saslRpcClient.getSaslQOP());
103    needProcessConnectionHeader =
104      saslEncryptionEnabled && conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false);
105  }
106
107  public boolean isNeedProcessConnectionHeader() {
108    return needProcessConnectionHeader;
109  }
110
111  @Override
112  public void handlerAdded(ChannelHandlerContext ctx) {
113    try {
114      byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
115
116        @Override
117        public byte[] run() throws Exception {
118          return saslRpcClient.getInitialResponse();
119        }
120      });
121      assert initialResponse != null;
122      writeResponse(ctx, initialResponse);
123      // HBASE-23881 We do not want to check if the SaslClient thinks the handshake is
124      // complete as, at this point, we've not heard a back from the server with it's reply
125      // to our first challenge response. We should wait for at least one reply
126      // from the server before calling negotiation complete.
127      //
128      // Each SASL mechanism has its own handshake. Some mechanisms calculate a single client buffer
129      // to be sent to the server while others have multiple exchanges to negotiate authentication.
130      // GSSAPI(Kerberos) and DIGEST-MD5 both are examples of mechanisms which have multiple steps.
131      // Mechanisms which have multiple steps will not return true on `SaslClient#isComplete()`
132      // until the handshake has fully completed. Mechanisms which only send a single buffer may
133      // return true on `isComplete()` after that initial response is calculated.
134    } catch (Exception e) {
135      // the exception thrown by handlerAdded will not be passed to the exceptionCaught below
136      // because netty will remove a handler if handlerAdded throws an exception.
137      exceptionCaught(ctx, e);
138    }
139  }
140
141  @Override
142  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
143    int len = msg.readInt();
144    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
145      saslRpcClient.dispose();
146      if (saslRpcClient.fallbackAllowed) {
147        saslPromise.trySuccess(false);
148      } else {
149        saslPromise.tryFailure(new FallbackDisallowedException());
150      }
151      return;
152    }
153    LOG.trace("Reading input token size={} for processing by initSASLContext", len);
154    final byte[] challenge = new byte[len];
155    msg.readBytes(challenge);
156    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
157
158      @Override
159      public byte[] run() throws Exception {
160        return saslRpcClient.evaluateChallenge(challenge);
161      }
162    });
163    if (response != null) {
164      writeResponse(ctx, response);
165    } else {
166      LOG.trace("SASL challenge response was empty, not sending response to server.");
167    }
168    tryComplete(ctx);
169  }
170
171  @Override
172  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
173    saslRpcClient.dispose();
174    saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
175    ctx.fireChannelInactive();
176  }
177
178  @Override
179  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
180    saslRpcClient.dispose();
181    saslPromise.tryFailure(cause);
182  }
183}