001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
022import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
023import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
024import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
025
026import java.io.IOException;
027import java.net.InetAddress;
028import java.security.PrivilegedExceptionAction;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
035import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
036import org.apache.hadoop.security.UserGroupInformation;
037import org.apache.hadoop.security.token.Token;
038import org.apache.hadoop.security.token.TokenIdentifier;
039
040/**
041 * Implement SASL logic for netty rpc client.
042 * @since 2.0.0
043 */
044@InterfaceAudience.Private
045public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
046
047  private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
048
049  private final Promise<Boolean> saslPromise;
050
051  private final UserGroupInformation ugi;
052
053  private final NettyHBaseSaslRpcClient saslRpcClient;
054
055  private final Configuration conf;
056
057  // flag to mark if Crypto AES encryption is enable
058  private boolean needProcessConnectionHeader = false;
059
060  /**
061   * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
062   *          simple.
063   */
064  public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
065      SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token,
066      InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed,
067      Configuration conf) throws IOException {
068    this.saslPromise = saslPromise;
069    this.ugi = ugi;
070    this.conf = conf;
071    this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr,
072        securityInfo, fallbackAllowed, conf.get(
073        "hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
074  }
075
076  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
077    LOG.trace("Sending token size={} from initSASLContext.", response.length);
078    ctx.writeAndFlush(
079      ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
080  }
081
082  private void tryComplete(ChannelHandlerContext ctx) {
083    if (!saslRpcClient.isComplete()) {
084      return;
085    }
086
087    saslRpcClient.setupSaslHandler(ctx.pipeline());
088    setCryptoAESOption();
089
090    saslPromise.setSuccess(true);
091  }
092
093  private void setCryptoAESOption() {
094    boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
095        getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
096    needProcessConnectionHeader = saslEncryptionEnabled && conf.getBoolean(
097        "hbase.rpc.crypto.encryption.aes.enabled", false);
098  }
099
100  public boolean isNeedProcessConnectionHeader() {
101    return needProcessConnectionHeader;
102  }
103
104  @Override
105  public void handlerAdded(ChannelHandlerContext ctx) {
106    try {
107      byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
108
109        @Override
110        public byte[] run() throws Exception {
111          return saslRpcClient.getInitialResponse();
112        }
113      });
114      if (initialResponse != null) {
115        writeResponse(ctx, initialResponse);
116      }
117      tryComplete(ctx);
118    } catch (Exception e) {
119      // the exception thrown by handlerAdded will not be passed to the exceptionCaught below
120      // because netty will remove a handler if handlerAdded throws an exception.
121      exceptionCaught(ctx, e);
122    }
123  }
124
125  @Override
126  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
127    int len = msg.readInt();
128    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
129      saslRpcClient.dispose();
130      if (saslRpcClient.fallbackAllowed) {
131        saslPromise.trySuccess(false);
132      } else {
133        saslPromise.tryFailure(new FallbackDisallowedException());
134      }
135      return;
136    }
137    LOG.trace("Reading input token size={} for processing by initSASLContext", len);
138    final byte[] challenge = new byte[len];
139    msg.readBytes(challenge);
140    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
141
142      @Override
143      public byte[] run() throws Exception {
144        return saslRpcClient.evaluateChallenge(challenge);
145      }
146    });
147    if (response != null) {
148      writeResponse(ctx, response);
149    }
150    tryComplete(ctx);
151  }
152
153  @Override
154  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
155    saslRpcClient.dispose();
156    saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
157    ctx.fireChannelInactive();
158  }
159
160  @Override
161  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
162    saslRpcClient.dispose();
163    saslPromise.tryFailure(cause);
164  }
165}