001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.security.PrivilegedExceptionAction; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 025import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; 026import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 027import org.apache.hadoop.hbase.util.NettyFutureUtils; 028import org.apache.hadoop.security.UserGroupInformation; 029import org.apache.hadoop.security.token.Token; 030import org.apache.hadoop.security.token.TokenIdentifier; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 036import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 037import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 038import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 039import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 040 041/** 042 * Implement SASL logic for netty rpc client. 043 * @since 2.0.0 044 */ 045@InterfaceAudience.Private 046public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 047 048 private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class); 049 050 public static final String HANDLER_NAME = "SaslRpcClientHandler"; 051 052 private final Promise<Boolean> saslPromise; 053 054 private final UserGroupInformation ugi; 055 056 private final NettyHBaseSaslRpcClient saslRpcClient; 057 058 private final Configuration conf; 059 060 private final SaslClientAuthenticationProvider provider; 061 062 // flag to mark if Crypto AES encryption is enable 063 private boolean needProcessConnectionHeader = false; 064 065 /** 066 * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to 067 * simple. 068 */ 069 public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, 070 SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, 071 InetAddress serverAddr, String serverPrincipal, boolean fallbackAllowed, Configuration conf) 072 throws IOException { 073 this.saslPromise = saslPromise; 074 this.ugi = ugi; 075 this.conf = conf; 076 this.provider = provider; 077 this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr, 078 serverPrincipal, fallbackAllowed, conf.get("hbase.rpc.protection", 079 SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase())); 080 } 081 082 private void writeResponse(ChannelHandlerContext ctx, byte[] response) { 083 LOG.trace("Sending token size={} from initSASLContext.", response.length); 084 NettyFutureUtils.safeWriteAndFlush(ctx, 085 ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); 086 } 087 088 private void tryComplete(ChannelHandlerContext ctx) { 089 if (!saslRpcClient.isComplete()) { 090 return; 091 } 092 093 // HBASE-23881 Clearly log when the client thinks that the SASL negotiation is complete. 094 if (LOG.isTraceEnabled()) { 095 LOG.trace("SASL negotiation for {} is complete", provider.getSaslAuthMethod().getName()); 096 } 097 saslRpcClient.setupSaslHandler(ctx.pipeline(), HANDLER_NAME); 098 removeHandlers(ctx); 099 100 setCryptoAESOption(); 101 102 saslPromise.setSuccess(true); 103 } 104 105 private void removeHandlers(ChannelHandlerContext ctx) { 106 ChannelPipeline p = ctx.pipeline(); 107 p.remove(SaslChallengeDecoder.class); 108 p.remove(this); 109 } 110 111 private void setCryptoAESOption() { 112 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 113 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 114 needProcessConnectionHeader = 115 saslEncryptionEnabled && conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 116 } 117 118 public boolean isNeedProcessConnectionHeader() { 119 return needProcessConnectionHeader; 120 } 121 122 @Override 123 public void handlerAdded(ChannelHandlerContext ctx) { 124 // dispose the saslRpcClient when the channel is closed, since saslRpcClient is final, it is 125 // safe to reference it in lambda expr. 126 NettyFutureUtils.addListener(ctx.channel().closeFuture(), f -> saslRpcClient.dispose()); 127 try { 128 byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 129 130 @Override 131 public byte[] run() throws Exception { 132 return saslRpcClient.getInitialResponse(); 133 } 134 }); 135 assert initialResponse != null; 136 writeResponse(ctx, initialResponse); 137 // HBASE-23881 We do not want to check if the SaslClient thinks the handshake is 138 // complete as, at this point, we've not heard a back from the server with it's reply 139 // to our first challenge response. We should wait for at least one reply 140 // from the server before calling negotiation complete. 141 // 142 // Each SASL mechanism has its own handshake. Some mechanisms calculate a single client buffer 143 // to be sent to the server while others have multiple exchanges to negotiate authentication. 144 // GSSAPI(Kerberos) and DIGEST-MD5 both are examples of mechanisms which have multiple steps. 145 // Mechanisms which have multiple steps will not return true on `SaslClient#isComplete()` 146 // until the handshake has fully completed. Mechanisms which only send a single buffer may 147 // return true on `isComplete()` after that initial response is calculated. 148 149 // HBASE-28337 We still want to check if the SaslClient completed the handshake, because 150 // there are certain mechs like PLAIN which doesn't have a server response after the 151 // initial authentication request. We cannot remove this tryComplete(), otherwise mechs 152 // like PLAIN will fail with call timeout. 153 tryComplete(ctx); 154 } catch (Exception e) { 155 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 156 // because netty will remove a handler if handlerAdded throws an exception. 157 exceptionCaught(ctx, e); 158 } 159 } 160 161 @Override 162 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 163 int len = msg.readInt(); 164 if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { 165 saslRpcClient.dispose(); 166 if (saslRpcClient.fallbackAllowed) { 167 saslPromise.trySuccess(false); 168 } else { 169 saslPromise.tryFailure(new FallbackDisallowedException()); 170 } 171 // When we switch to simple auth, we should also remove SaslChallengeDecoder and 172 // NettyHBaseSaslRpcClientHandler. 173 removeHandlers(ctx); 174 return; 175 } 176 LOG.trace("Reading input token size={} for processing by initSASLContext", len); 177 final byte[] challenge = new byte[len]; 178 msg.readBytes(challenge); 179 byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 180 181 @Override 182 public byte[] run() throws Exception { 183 return saslRpcClient.evaluateChallenge(challenge); 184 } 185 }); 186 if (response != null) { 187 writeResponse(ctx, response); 188 } else { 189 LOG.trace("SASL challenge response was empty, not sending response to server."); 190 } 191 tryComplete(ctx); 192 } 193 194 @Override 195 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 196 saslPromise.tryFailure(new ConnectionClosedException("Connection closed")); 197 ctx.fireChannelInactive(); 198 } 199 200 @Override 201 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 202 saslPromise.tryFailure(cause); 203 } 204}