001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.security.PrivilegedExceptionAction; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 025import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; 026import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 027import org.apache.hadoop.security.UserGroupInformation; 028import org.apache.hadoop.security.token.Token; 029import org.apache.hadoop.security.token.TokenIdentifier; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 035import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 036import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 037import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 038 039/** 040 * Implement SASL logic for netty rpc client. 041 * @since 2.0.0 042 */ 043@InterfaceAudience.Private 044public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 045 046 private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class); 047 048 private final Promise<Boolean> saslPromise; 049 050 private final UserGroupInformation ugi; 051 052 private final NettyHBaseSaslRpcClient saslRpcClient; 053 054 private final Configuration conf; 055 056 private final SaslClientAuthenticationProvider provider; 057 058 // flag to mark if Crypto AES encryption is enable 059 private boolean needProcessConnectionHeader = false; 060 061 /** 062 * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to 063 * simple. 064 */ 065 public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, 066 SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, 067 InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed, Configuration conf) 068 throws IOException { 069 this.saslPromise = saslPromise; 070 this.ugi = ugi; 071 this.conf = conf; 072 this.provider = provider; 073 this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr, 074 securityInfo, fallbackAllowed, conf.get("hbase.rpc.protection", 075 SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase())); 076 } 077 078 private void writeResponse(ChannelHandlerContext ctx, byte[] response) { 079 LOG.trace("Sending token size={} from initSASLContext.", response.length); 080 ctx.writeAndFlush( 081 ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); 082 } 083 084 private void tryComplete(ChannelHandlerContext ctx) { 085 if (!saslRpcClient.isComplete()) { 086 return; 087 } 088 089 // HBASE-23881 Clearly log when the client thinks that the SASL negotiation is complete. 090 if (LOG.isTraceEnabled()) { 091 LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName()); 092 } 093 094 saslRpcClient.setupSaslHandler(ctx.pipeline()); 095 setCryptoAESOption(); 096 097 saslPromise.setSuccess(true); 098 } 099 100 private void setCryptoAESOption() { 101 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 102 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 103 needProcessConnectionHeader = 104 saslEncryptionEnabled && conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 105 } 106 107 public boolean isNeedProcessConnectionHeader() { 108 return needProcessConnectionHeader; 109 } 110 111 @Override 112 public void handlerAdded(ChannelHandlerContext ctx) { 113 try { 114 byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 115 116 @Override 117 public byte[] run() throws Exception { 118 return saslRpcClient.getInitialResponse(); 119 } 120 }); 121 assert initialResponse != null; 122 writeResponse(ctx, initialResponse); 123 // HBASE-23881 We do not want to check if the SaslClient thinks the handshake is 124 // complete as, at this point, we've not heard a back from the server with it's reply 125 // to our first challenge response. We should wait for at least one reply 126 // from the server before calling negotiation complete. 127 // 128 // Each SASL mechanism has its own handshake. Some mechanisms calculate a single client buffer 129 // to be sent to the server while others have multiple exchanges to negotiate authentication. 130 // GSSAPI(Kerberos) and DIGEST-MD5 both are examples of mechanisms which have multiple steps. 131 // Mechanisms which have multiple steps will not return true on `SaslClient#isComplete()` 132 // until the handshake has fully completed. Mechanisms which only send a single buffer may 133 // return true on `isComplete()` after that initial response is calculated. 134 } catch (Exception e) { 135 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 136 // because netty will remove a handler if handlerAdded throws an exception. 137 exceptionCaught(ctx, e); 138 } 139 } 140 141 @Override 142 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 143 int len = msg.readInt(); 144 if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { 145 saslRpcClient.dispose(); 146 if (saslRpcClient.fallbackAllowed) { 147 saslPromise.trySuccess(false); 148 } else { 149 saslPromise.tryFailure(new FallbackDisallowedException()); 150 } 151 return; 152 } 153 LOG.trace("Reading input token size={} for processing by initSASLContext", len); 154 final byte[] challenge = new byte[len]; 155 msg.readBytes(challenge); 156 byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 157 158 @Override 159 public byte[] run() throws Exception { 160 return saslRpcClient.evaluateChallenge(challenge); 161 } 162 }); 163 if (response != null) { 164 writeResponse(ctx, response); 165 } else { 166 LOG.trace("SASL challenge response was empty, not sending response to server."); 167 } 168 tryComplete(ctx); 169 } 170 171 @Override 172 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 173 saslRpcClient.dispose(); 174 saslPromise.tryFailure(new ConnectionClosedException("Connection closed")); 175 ctx.fireChannelInactive(); 176 } 177 178 @Override 179 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 180 saslRpcClient.dispose(); 181 saslPromise.tryFailure(cause); 182 } 183}