001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.security.PrivilegedExceptionAction; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 025import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; 026import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 027import org.apache.hadoop.security.UserGroupInformation; 028import org.apache.hadoop.security.token.Token; 029import org.apache.hadoop.security.token.TokenIdentifier; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 035import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 036import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 037import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 038 039/** 040 * Implement SASL logic for netty rpc client. 041 * @since 2.0.0 042 */ 043@InterfaceAudience.Private 044public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 045 046 private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class); 047 048 private final Promise<Boolean> saslPromise; 049 050 private final UserGroupInformation ugi; 051 052 private final NettyHBaseSaslRpcClient saslRpcClient; 053 054 private final Configuration conf; 055 056 // flag to mark if Crypto AES encryption is enable 057 private boolean needProcessConnectionHeader = false; 058 059 /** 060 * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to 061 * simple. 062 */ 063 public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, 064 SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, 065 InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed, Configuration conf) 066 throws IOException { 067 this.saslPromise = saslPromise; 068 this.ugi = ugi; 069 this.conf = conf; 070 this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr, 071 securityInfo, fallbackAllowed, conf.get("hbase.rpc.protection", 072 SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase())); 073 } 074 075 private void writeResponse(ChannelHandlerContext ctx, byte[] response) { 076 LOG.trace("Sending token size={} from initSASLContext.", response.length); 077 ctx.writeAndFlush( 078 ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); 079 } 080 081 private void tryComplete(ChannelHandlerContext ctx) { 082 if (!saslRpcClient.isComplete()) { 083 return; 084 } 085 086 saslRpcClient.setupSaslHandler(ctx.pipeline()); 087 setCryptoAESOption(); 088 089 saslPromise.setSuccess(true); 090 } 091 092 private void setCryptoAESOption() { 093 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 094 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 095 needProcessConnectionHeader = 096 saslEncryptionEnabled && conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 097 } 098 099 public boolean isNeedProcessConnectionHeader() { 100 return needProcessConnectionHeader; 101 } 102 103 @Override 104 public void handlerAdded(ChannelHandlerContext ctx) { 105 try { 106 byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 107 108 @Override 109 public byte[] run() throws Exception { 110 return saslRpcClient.getInitialResponse(); 111 } 112 }); 113 if (initialResponse != null) { 114 writeResponse(ctx, initialResponse); 115 } 116 tryComplete(ctx); 117 } catch (Exception e) { 118 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 119 // because netty will remove a handler if handlerAdded throws an exception. 120 exceptionCaught(ctx, e); 121 } 122 } 123 124 @Override 125 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 126 int len = msg.readInt(); 127 if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { 128 saslRpcClient.dispose(); 129 if (saslRpcClient.fallbackAllowed) { 130 saslPromise.trySuccess(false); 131 } else { 132 saslPromise.tryFailure(new FallbackDisallowedException()); 133 } 134 return; 135 } 136 LOG.trace("Reading input token size={} for processing by initSASLContext", len); 137 final byte[] challenge = new byte[len]; 138 msg.readBytes(challenge); 139 byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 140 141 @Override 142 public byte[] run() throws Exception { 143 return saslRpcClient.evaluateChallenge(challenge); 144 } 145 }); 146 if (response != null) { 147 writeResponse(ctx, response); 148 } 149 tryComplete(ctx); 150 } 151 152 @Override 153 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 154 saslRpcClient.dispose(); 155 saslPromise.tryFailure(new ConnectionClosedException("Connection closed")); 156 ctx.fireChannelInactive(); 157 } 158 159 @Override 160 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 161 saslRpcClient.dispose(); 162 saslPromise.tryFailure(cause); 163 } 164}