001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.Waiter; 034import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.testclassification.RPCTests; 037import org.junit.After; 038import org.junit.Before; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 044import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 045import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 046import org.apache.hbase.thirdparty.io.netty.channel.Channel; 047import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 049 050import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 051import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 052 053/** 054 * Confirm that we truly close the NettyRpcConnection when the netty channel is closed. 055 */ 056@Category({ RPCTests.class, MediumTests.class }) 057public class TestNettyIPCCloseConnection { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestNettyIPCCloseConnection.class); 062 063 private static Configuration CONF = HBaseConfiguration.create(); 064 065 private NioEventLoopGroup group; 066 067 private NettyRpcServer server; 068 069 private NettyRpcClient client; 070 071 private TestProtobufRpcProto.BlockingInterface stub; 072 073 @Before 074 public void setUp() throws IOException { 075 group = new NioEventLoopGroup(); 076 server = new NettyRpcServer(null, getClass().getSimpleName(), 077 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 078 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1), true); 079 NettyRpcClientConfigHelper.setEventLoopConfig(CONF, group, NioSocketChannel.class); 080 client = new NettyRpcClient(CONF); 081 server.start(); 082 stub = TestProtobufRpcServiceImpl.newBlockingStub(client, server.getListenerAddress()); 083 } 084 085 @After 086 public void tearDown() throws Exception { 087 Closeables.close(client, true); 088 server.stop(); 089 group.shutdownGracefully().sync(); 090 } 091 092 @Test 093 public void test() throws Exception { 094 assertEquals("test", 095 stub.echo(null, EchoRequestProto.newBuilder().setMessage("test").build()).getMessage()); 096 Channel channel = Iterators.getOnlyElement(server.allChannels.iterator()); 097 assertNotNull(channel); 098 NettyRpcFrameDecoder decoder = channel.pipeline().get(NettyRpcFrameDecoder.class); 099 // set a mock saslServer to verify that it will call the dispose method of this instance 100 HBaseSaslRpcServer saslServer = mock(HBaseSaslRpcServer.class); 101 decoder.connection.saslServer = saslServer; 102 client.close(); 103 // the channel should have been closed 104 channel.closeFuture().await(5, TimeUnit.SECONDS); 105 // verify that we have called the dispose method and set saslServer to null 106 Waiter.waitFor(CONF, 5000, () -> decoder.connection.saslServer == null); 107 verify(saslServer, times(1)).dispose(); 108 } 109}