001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.Waiter; 033import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.testclassification.RPCTests; 036import org.junit.jupiter.api.AfterEach; 037import org.junit.jupiter.api.BeforeEach; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.Test; 040 041import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 042import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 043import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 044import org.apache.hbase.thirdparty.io.netty.channel.Channel; 045import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 046import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 047 048import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 049import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 050 051/** 052 * Confirm that we truly close the NettyRpcConnection when the netty channel is closed. 053 */ 054@Tag(RPCTests.TAG) 055@Tag(MediumTests.TAG) 056public class TestNettyIPCCloseConnection { 057 058 private static Configuration CONF = HBaseConfiguration.create(); 059 060 private NioEventLoopGroup group; 061 062 private NettyRpcServer server; 063 064 private NettyRpcClient client; 065 066 private TestProtobufRpcProto.BlockingInterface stub; 067 068 @BeforeEach 069 public void setUp() throws IOException { 070 group = new NioEventLoopGroup(); 071 server = new NettyRpcServer(null, getClass().getSimpleName(), 072 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 073 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1), true); 074 NettyRpcClientConfigHelper.setEventLoopConfig(CONF, group, NioSocketChannel.class); 075 client = new NettyRpcClient(CONF); 076 server.start(); 077 stub = TestProtobufRpcServiceImpl.newBlockingStub(client, server.getListenerAddress()); 078 } 079 080 @AfterEach 081 public void tearDown() throws Exception { 082 Closeables.close(client, true); 083 server.stop(); 084 group.shutdownGracefully().sync(); 085 } 086 087 @Test 088 public void test() throws Exception { 089 assertEquals("test", 090 stub.echo(null, EchoRequestProto.newBuilder().setMessage("test").build()).getMessage()); 091 Channel channel = Iterators.getOnlyElement(server.allChannels.iterator()); 092 assertNotNull(channel); 093 NettyRpcFrameDecoder decoder = channel.pipeline().get(NettyRpcFrameDecoder.class); 094 // set a mock saslServer to verify that it will call the dispose method of this instance 095 HBaseSaslRpcServer saslServer = mock(HBaseSaslRpcServer.class); 096 decoder.connection.saslServer = saslServer; 097 client.close(); 098 // the channel should have been closed 099 channel.closeFuture().await(5, TimeUnit.SECONDS); 100 // verify that we have called the dispose method and set saslServer to null 101 Waiter.waitFor(CONF, 5000, () -> decoder.connection.saslServer == null); 102 verify(saslServer, times(1)).dispose(); 103 } 104}