001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
022import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
023import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
024import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
025
026import java.io.IOException;
027import java.security.PrivilegedExceptionAction;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
034import org.apache.hadoop.security.UserGroupInformation;
035import org.apache.hadoop.security.token.Token;
036import org.apache.hadoop.security.token.TokenIdentifier;
037
038/**
039 * Implement SASL logic for netty rpc client.
040 * @since 2.0.0
041 */
042@InterfaceAudience.Private
043public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
044
045  private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class);
046
047  private final Promise<Boolean> saslPromise;
048
049  private final UserGroupInformation ugi;
050
051  private final NettyHBaseSaslRpcClient saslRpcClient;
052
053  private final Configuration conf;
054
055  // flag to mark if Crypto AES encryption is enable
056  private boolean needProcessConnectionHeader = false;
057
058  /**
059   * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
060   *          simple.
061   */
062  public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
063      AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal,
064      boolean fallbackAllowed, Configuration conf)
065      throws IOException {
066    this.saslPromise = saslPromise;
067    this.ugi = ugi;
068    this.conf = conf;
069    this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal,
070        fallbackAllowed, conf.get(
071        "hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
072  }
073
074  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
075    LOG.trace("Sending token size={} from initSASLContext.", response.length);
076    ctx.writeAndFlush(
077      ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
078  }
079
080  private void tryComplete(ChannelHandlerContext ctx) {
081    if (!saslRpcClient.isComplete()) {
082      return;
083    }
084
085    saslRpcClient.setupSaslHandler(ctx.pipeline());
086    setCryptoAESOption();
087
088    saslPromise.setSuccess(true);
089  }
090
091  private void setCryptoAESOption() {
092    boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
093        getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
094    needProcessConnectionHeader = saslEncryptionEnabled && conf.getBoolean(
095        "hbase.rpc.crypto.encryption.aes.enabled", false);
096  }
097
098  public boolean isNeedProcessConnectionHeader() {
099    return needProcessConnectionHeader;
100  }
101
102  @Override
103  public void handlerAdded(ChannelHandlerContext ctx) {
104    try {
105      byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
106
107        @Override
108        public byte[] run() throws Exception {
109          return saslRpcClient.getInitialResponse();
110        }
111      });
112      if (initialResponse != null) {
113        writeResponse(ctx, initialResponse);
114      }
115      tryComplete(ctx);
116    } catch (Exception e) {
117      // the exception thrown by handlerAdded will not be passed to the exceptionCaught below
118      // because netty will remove a handler if handlerAdded throws an exception.
119      exceptionCaught(ctx, e);
120    }
121  }
122
123  @Override
124  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
125    int len = msg.readInt();
126    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
127      saslRpcClient.dispose();
128      if (saslRpcClient.fallbackAllowed) {
129        saslPromise.trySuccess(false);
130      } else {
131        saslPromise.tryFailure(new FallbackDisallowedException());
132      }
133      return;
134    }
135    LOG.trace("Reading input token size={} for processing by initSASLContext", len);
136    final byte[] challenge = new byte[len];
137    msg.readBytes(challenge);
138    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
139
140      @Override
141      public byte[] run() throws Exception {
142        return saslRpcClient.evaluateChallenge(challenge);
143      }
144    });
145    if (response != null) {
146      writeResponse(ctx, response);
147    }
148    tryComplete(ctx);
149  }
150
151  @Override
152  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
153    saslRpcClient.dispose();
154    saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
155    ctx.fireChannelInactive();
156  }
157
158  @Override
159  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
160    saslRpcClient.dispose();
161    saslPromise.tryFailure(cause);
162  }
163}