001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import javax.security.sasl.SaslClient; 021 022import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 023import org.apache.yetus.audience.InterfaceAudience; 024 025import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 026import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 027import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 028import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 030import org.apache.hbase.thirdparty.io.netty.channel.CoalescingBufferQueue; 031import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 032import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; 033 034 035/** 036 * wrap sasl messages. 037 */ 038@InterfaceAudience.Private 039public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 040 041 private final SaslClient saslClient; 042 043 private CoalescingBufferQueue queue; 044 045 public SaslWrapHandler(SaslClient saslClient) { 046 this.saslClient = saslClient; 047 } 048 049 @Override 050 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 051 queue = new CoalescingBufferQueue(ctx.channel()); 052 } 053 054 @Override 055 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 056 throws Exception { 057 if (msg instanceof ByteBuf) { 058 queue.add((ByteBuf) msg, promise); 059 } else { 060 ctx.write(msg, promise); 061 } 062 } 063 064 @Override 065 public void flush(ChannelHandlerContext ctx) throws Exception { 066 if (queue.isEmpty()) { 067 return; 068 } 069 ByteBuf buf = null; 070 try { 071 ChannelPromise promise = ctx.newPromise(); 072 int readableBytes = queue.readableBytes(); 073 buf = queue.remove(readableBytes, promise); 074 byte[] bytes = new byte[readableBytes]; 075 buf.readBytes(bytes); 076 byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); 077 ChannelPromise lenPromise = ctx.newPromise(); 078 ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); 079 ChannelPromise contentPromise = ctx.newPromise(); 080 ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); 081 PromiseCombiner combiner = new PromiseCombiner(); 082 combiner.addAll(lenPromise, contentPromise); 083 combiner.finish(promise); 084 ctx.flush(); 085 } finally { 086 if (buf != null) { 087 ReferenceCountUtil.safeRelease(buf); 088 } 089 } 090 } 091 092 @Override 093 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 094 if (!queue.isEmpty()) { 095 queue.releaseAndFailAll(new ConnectionClosedException("Connection closed")); 096 } 097 ctx.close(promise); 098 } 099}