001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertThrows; 024 025import java.io.IOException; 026import java.net.InetSocketAddress; 027import java.util.Arrays; 028import java.util.stream.Stream; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.logging.Log4jUtils; 034import org.apache.hadoop.hbase.testclassification.RPCTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.jupiter.api.AfterEach; 037import org.junit.jupiter.api.BeforeEach; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.TestTemplate; 040import org.junit.jupiter.params.provider.Arguments; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 043 044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 047import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 048 049/** 050 * Test for testing protocol buffer based RPC mechanism. This test depends on test.proto definition 051 * of types in <code>src/test/protobuf/test.proto</code> and protobuf service definition from 052 * <code>src/test/protobuf/test_rpc_service.proto</code> 053 */ 054@Tag(RPCTests.TAG) 055@Tag(SmallTests.TAG) 056@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}") 057public class TestProtoBufRpc { 058 059 public final static String ADDRESS = "localhost"; 060 private static int PORT = 0; 061 private InetSocketAddress isa; 062 private Configuration conf; 063 private RpcServerInterface server; 064 private final String rpcServerImpl; 065 066 public TestProtoBufRpc(String rpcServerImpl) { 067 this.rpcServerImpl = rpcServerImpl; 068 } 069 070 public static Stream<Arguments> parameters() { 071 return Arrays 072 .stream(new Object[] { SimpleRpcServer.class.getName(), NettyRpcServer.class.getName() }) 073 .map(Arguments::of); 074 } 075 076 @BeforeEach 077 public void setUp() throws IOException { // Setup server for both protocols 078 this.conf = HBaseConfiguration.create(); 079 this.conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 080 Log4jUtils.setLogLevel("org.apache.hadoop.ipc.HBaseServer", "ERROR"); 081 Log4jUtils.setLogLevel("org.apache.hadoop.ipc.HBaseServer.trace", "TRACE"); 082 // Create server side implementation 083 // Get RPC server for server side implementation 084 this.server = RpcServerFactory.createRpcServer(null, "testrpc", 085 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 086 new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); 087 InetSocketAddress address = server.getListenerAddress(); 088 if (address == null) { 089 throw new IOException("Listener channel is closed"); 090 } 091 this.isa = address; 092 this.server.start(); 093 } 094 095 @AfterEach 096 public void tearDown() throws Exception { 097 server.stop(); 098 } 099 100 @TestTemplate 101 public void testProtoBufRpc() throws Exception { 102 RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); 103 try { 104 BlockingInterface stub = newBlockingStub(rpcClient, this.isa); 105 // Test ping method 106 TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.newBuilder().build(); 107 stub.ping(null, emptyRequest); 108 109 // Test echo method 110 EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build(); 111 EchoResponseProto echoResponse = stub.echo(null, echoRequest); 112 assertEquals("hello", echoResponse.getMessage()); 113 114 assertThrows(org.apache.hbase.thirdparty.com.google.protobuf.ServiceException.class, 115 () -> stub.error(null, emptyRequest)); 116 } finally { 117 rpcClient.close(); 118 } 119 } 120}