001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 022import org.apache.yetus.audience.InterfaceAudience; 023 024import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 025import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 026import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 027import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 028import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 029 030import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 031 032/** 033 * Implement logic to deal with the rpc connection header. 034 * @since 2.0.0 035 */ 036@InterfaceAudience.Private 037public class NettyHBaseRpcConnectionHeaderHandler extends SimpleChannelInboundHandler<ByteBuf> { 038 039 private final Promise<Boolean> saslPromise; 040 041 private final Configuration conf; 042 043 private final ByteBuf connectionHeaderWithLength; 044 045 public NettyHBaseRpcConnectionHeaderHandler(Promise<Boolean> saslPromise, Configuration conf, 046 ByteBuf connectionHeaderWithLength) { 047 this.saslPromise = saslPromise; 048 this.conf = conf; 049 this.connectionHeaderWithLength = connectionHeaderWithLength; 050 } 051 052 @Override 053 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 054 // read the ConnectionHeaderResponse from server 055 int len = msg.readInt(); 056 byte[] buff = new byte[len]; 057 msg.readBytes(buff); 058 059 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 060 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 061 062 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 063 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 064 CryptoAES cryptoAES = 065 EncryptionUtil.createCryptoAES(connectionHeaderResponse.getCryptoCipherMeta(), conf); 066 // replace the Sasl handler with Crypto AES handler 067 setupCryptoAESHandler(ctx.pipeline(), cryptoAES); 068 } 069 070 saslPromise.setSuccess(true); 071 } 072 073 @Override 074 public void handlerAdded(ChannelHandlerContext ctx) { 075 try { 076 // send the connection header to server first 077 ctx.writeAndFlush(connectionHeaderWithLength.retainedDuplicate()); 078 } catch (Exception e) { 079 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 080 // because netty will remove a handler if handlerAdded throws an exception. 081 exceptionCaught(ctx, e); 082 } 083 } 084 085 @Override 086 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 087 saslPromise.tryFailure(cause); 088 } 089 090 /** 091 * Remove handlers for sasl encryption and add handlers for Crypto AES encryption 092 */ 093 private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) { 094 p.replace(SaslWrapHandler.class, null, new SaslWrapHandler(cryptoAES::wrap)); 095 p.replace(SaslUnwrapHandler.class, null, new SaslUnwrapHandler(cryptoAES::unwrap)); 096 } 097}