001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 022import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 023import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 024import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 025 026import java.io.IOException; 027import java.net.InetAddress; 028import java.security.PrivilegedExceptionAction; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; 035import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 036import org.apache.hadoop.security.UserGroupInformation; 037import org.apache.hadoop.security.token.Token; 038import org.apache.hadoop.security.token.TokenIdentifier; 039 040/** 041 * Implement SASL logic for netty rpc client. 042 * @since 2.0.0 043 */ 044@InterfaceAudience.Private 045public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 046 047 private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class); 048 049 private final Promise<Boolean> saslPromise; 050 051 private final UserGroupInformation ugi; 052 053 private final NettyHBaseSaslRpcClient saslRpcClient; 054 055 private final Configuration conf; 056 057 // flag to mark if Crypto AES encryption is enable 058 private boolean needProcessConnectionHeader = false; 059 060 /** 061 * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to 062 * simple. 063 */ 064 public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, 065 SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, 066 InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed, 067 Configuration conf) throws IOException { 068 this.saslPromise = saslPromise; 069 this.ugi = ugi; 070 this.conf = conf; 071 this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr, 072 securityInfo, fallbackAllowed, conf.get( 073 "hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase())); 074 } 075 076 private void writeResponse(ChannelHandlerContext ctx, byte[] response) { 077 LOG.trace("Sending token size={} from initSASLContext.", response.length); 078 ctx.writeAndFlush( 079 ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); 080 } 081 082 private void tryComplete(ChannelHandlerContext ctx) { 083 if (!saslRpcClient.isComplete()) { 084 return; 085 } 086 087 saslRpcClient.setupSaslHandler(ctx.pipeline()); 088 setCryptoAESOption(); 089 090 saslPromise.setSuccess(true); 091 } 092 093 private void setCryptoAESOption() { 094 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY. 095 getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP()); 096 needProcessConnectionHeader = saslEncryptionEnabled && conf.getBoolean( 097 "hbase.rpc.crypto.encryption.aes.enabled", false); 098 } 099 100 public boolean isNeedProcessConnectionHeader() { 101 return needProcessConnectionHeader; 102 } 103 104 @Override 105 public void handlerAdded(ChannelHandlerContext ctx) { 106 try { 107 byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 108 109 @Override 110 public byte[] run() throws Exception { 111 return saslRpcClient.getInitialResponse(); 112 } 113 }); 114 if (initialResponse != null) { 115 writeResponse(ctx, initialResponse); 116 } 117 tryComplete(ctx); 118 } catch (Exception e) { 119 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 120 // because netty will remove a handler if handlerAdded throws an exception. 121 exceptionCaught(ctx, e); 122 } 123 } 124 125 @Override 126 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 127 int len = msg.readInt(); 128 if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { 129 saslRpcClient.dispose(); 130 if (saslRpcClient.fallbackAllowed) { 131 saslPromise.trySuccess(false); 132 } else { 133 saslPromise.tryFailure(new FallbackDisallowedException()); 134 } 135 return; 136 } 137 LOG.trace("Reading input token size={} for processing by initSASLContext", len); 138 final byte[] challenge = new byte[len]; 139 msg.readBytes(challenge); 140 byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 141 142 @Override 143 public byte[] run() throws Exception { 144 return saslRpcClient.evaluateChallenge(challenge); 145 } 146 }); 147 if (response != null) { 148 writeResponse(ctx, response); 149 } 150 tryComplete(ctx); 151 } 152 153 @Override 154 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 155 saslRpcClient.dispose(); 156 saslPromise.tryFailure(new ConnectionClosedException("Connection closed")); 157 ctx.fireChannelInactive(); 158 } 159 160 @Override 161 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 162 saslRpcClient.dispose(); 163 saslPromise.tryFailure(cause); 164 } 165}