001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 022import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 023import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 024import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 025 026import java.io.IOException; 027import java.security.PrivilegedExceptionAction; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; 034import org.apache.hadoop.security.UserGroupInformation; 035import org.apache.hadoop.security.token.Token; 036import org.apache.hadoop.security.token.TokenIdentifier; 037 038/** 039 * Implement SASL logic for netty rpc client. 040 * @since 2.0.0 041 */ 042@InterfaceAudience.Private 043public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 044 045 private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClientHandler.class); 046 047 private final Promise<Boolean> saslPromise; 048 049 private final UserGroupInformation ugi; 050 051 private final NettyHBaseSaslRpcClient saslRpcClient; 052 053 private final Configuration conf; 054 055 // flag to mark if Crypto AES encryption is enable 056 private boolean needProcessConnectionHeader = false; 057 058 /** 059 * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to 060 * simple. 061 */ 062 public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, 063 AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal, 064 boolean fallbackAllowed, Configuration conf) 065 throws IOException { 066 this.saslPromise = saslPromise; 067 this.ugi = ugi; 068 this.conf = conf; 069 this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal, 070 fallbackAllowed, conf.get( 071 "hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase())); 072 } 073 074 private void writeResponse(ChannelHandlerContext ctx, byte[] response) { 075 LOG.trace("Sending token size={} from initSASLContext.", response.length); 076 ctx.writeAndFlush( 077 ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); 078 } 079 080 private void tryComplete(ChannelHandlerContext ctx) { 081 if (!saslRpcClient.isComplete()) { 082 return; 083 } 084 085 saslRpcClient.setupSaslHandler(ctx.pipeline()); 086 setCryptoAESOption(); 087 088 saslPromise.setSuccess(true); 089 } 090 091 private void setCryptoAESOption() { 092 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY. 093 getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP()); 094 needProcessConnectionHeader = saslEncryptionEnabled && conf.getBoolean( 095 "hbase.rpc.crypto.encryption.aes.enabled", false); 096 } 097 098 public boolean isNeedProcessConnectionHeader() { 099 return needProcessConnectionHeader; 100 } 101 102 @Override 103 public void handlerAdded(ChannelHandlerContext ctx) { 104 try { 105 byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 106 107 @Override 108 public byte[] run() throws Exception { 109 return saslRpcClient.getInitialResponse(); 110 } 111 }); 112 if (initialResponse != null) { 113 writeResponse(ctx, initialResponse); 114 } 115 tryComplete(ctx); 116 } catch (Exception e) { 117 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 118 // because netty will remove a handler if handlerAdded throws an exception. 119 exceptionCaught(ctx, e); 120 } 121 } 122 123 @Override 124 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 125 int len = msg.readInt(); 126 if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { 127 saslRpcClient.dispose(); 128 if (saslRpcClient.fallbackAllowed) { 129 saslPromise.trySuccess(false); 130 } else { 131 saslPromise.tryFailure(new FallbackDisallowedException()); 132 } 133 return; 134 } 135 LOG.trace("Reading input token size={} for processing by initSASLContext", len); 136 final byte[] challenge = new byte[len]; 137 msg.readBytes(challenge); 138 byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { 139 140 @Override 141 public byte[] run() throws Exception { 142 return saslRpcClient.evaluateChallenge(challenge); 143 } 144 }); 145 if (response != null) { 146 writeResponse(ctx, response); 147 } 148 tryComplete(ctx); 149 } 150 151 @Override 152 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 153 saslRpcClient.dispose(); 154 saslPromise.tryFailure(new ConnectionClosedException("Connection closed")); 155 ctx.fireChannelInactive(); 156 } 157 158 @Override 159 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 160 saslRpcClient.dispose(); 161 saslPromise.tryFailure(cause); 162 } 163}