001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security; 020 021import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 022import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 023import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 024import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 025import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 026import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 027import org.apache.hbase.thirdparty.io.netty.channel.CoalescingBufferQueue; 028import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 029import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 032 033 034/** 035 * wrap messages with Crypto AES. 036 */ 037@InterfaceAudience.Private 038public class CryptoAESWrapHandler extends ChannelOutboundHandlerAdapter { 039 040 private final CryptoAES cryptoAES; 041 042 private CoalescingBufferQueue queue; 043 044 public CryptoAESWrapHandler(CryptoAES cryptoAES) { 045 this.cryptoAES = cryptoAES; 046 } 047 048 @Override 049 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 050 queue = new CoalescingBufferQueue(ctx.channel()); 051 } 052 053 @Override 054 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 055 throws Exception { 056 if (msg instanceof ByteBuf) { 057 queue.add((ByteBuf) msg, promise); 058 } else { 059 ctx.write(msg, promise); 060 } 061 } 062 063 @Override 064 public void flush(ChannelHandlerContext ctx) throws Exception { 065 if (queue.isEmpty()) { 066 return; 067 } 068 ByteBuf buf = null; 069 try { 070 ChannelPromise promise = ctx.newPromise(); 071 int readableBytes = queue.readableBytes(); 072 buf = queue.remove(readableBytes, promise); 073 byte[] bytes = new byte[readableBytes]; 074 buf.readBytes(bytes); 075 byte[] wrapperBytes = cryptoAES.wrap(bytes, 0, bytes.length); 076 ChannelPromise lenPromise = ctx.newPromise(); 077 ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); 078 ChannelPromise contentPromise = ctx.newPromise(); 079 ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); 080 PromiseCombiner combiner = new PromiseCombiner(); 081 combiner.addAll(lenPromise, contentPromise); 082 combiner.finish(promise); 083 ctx.flush(); 084 } finally { 085 if (buf != null) { 086 ReferenceCountUtil.safeRelease(buf); 087 } 088 } 089 } 090 091 @Override 092 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 093 if (!queue.isEmpty()) { 094 queue.releaseAndFailAll(new ConnectionClosedException("Connection closed")); 095 } 096 ctx.close(promise); 097 } 098}