001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import com.google.protobuf.RpcCallback; 025import com.google.protobuf.RpcController; 026import com.google.protobuf.Service; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.util.Collections; 030import org.apache.hadoop.hbase.CoprocessorEnvironment; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.RetriesExhaustedException; 036import org.apache.hadoop.hbase.client.TestAsyncAdminBase; 037import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; 038import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; 039import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; 040import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; 041import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 042import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; 043import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; 044import org.apache.hadoop.hbase.testclassification.ClientTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.runner.RunWith; 051import org.junit.runners.Parameterized; 052 053@RunWith(Parameterized.class) 054@Category({ ClientTests.class, MediumTests.class }) 055public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class); 059 060 private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); 061 private static final String DUMMY_VALUE = "val"; 062 063 @BeforeClass 064 public static void setUpBeforeClass() throws Exception { 065 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 066 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 067 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 068 TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 069 ProtobufCoprocessorService.class.getName()); 070 TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 071 DummyRegionServerEndpoint.class.getName()); 072 TEST_UTIL.startMiniCluster(2); 073 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 074 } 075 076 @Test 077 public void testMasterCoprocessorService() throws Exception { 078 TestProtos.EchoRequestProto request = 079 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); 080 TestProtos.EchoResponseProto response = 081 admin 082 .<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto> 083 coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub, 084 (s, c, done) -> s.echo(c, request, done)).get(); 085 assertEquals("hello", response.getMessage()); 086 } 087 088 @Test 089 public void testMasterCoprocessorError() throws Exception { 090 TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance(); 091 try { 092 admin 093 .<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto> 094 coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub, 095 (s, c, done) -> s.error(c, emptyRequest, done)).get(); 096 fail("Should have thrown an exception"); 097 } catch (Exception e) { 098 } 099 } 100 101 @Test 102 public void testRegionServerCoprocessorService() throws Exception { 103 final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); 104 DummyRegionServerEndpointProtos.DummyRequest request = 105 DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); 106 DummyRegionServerEndpointProtos.DummyResponse response = 107 admin 108 .<DummyRegionServerEndpointProtos.DummyService.Stub, 109 DummyRegionServerEndpointProtos.DummyResponse> coprocessorService( 110 DummyRegionServerEndpointProtos.DummyService::newStub, 111 (s, c, done) -> s.dummyCall(c, request, done), serverName).get(); 112 assertEquals(DUMMY_VALUE, response.getValue()); 113 } 114 115 @Test 116 public void testRegionServerCoprocessorServiceError() throws Exception { 117 final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); 118 DummyRegionServerEndpointProtos.DummyRequest request = 119 DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); 120 try { 121 admin 122 .<DummyRegionServerEndpointProtos.DummyService.Stub, 123 DummyRegionServerEndpointProtos.DummyResponse> coprocessorService( 124 DummyRegionServerEndpointProtos.DummyService::newStub, 125 (s, c, done) -> s.dummyThrow(c, request, done), serverName).get(); 126 fail("Should have thrown an exception"); 127 } catch (Exception e) { 128 assertTrue(e.getCause() instanceof RetriesExhaustedException); 129 assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); 130 } 131 } 132 133 public static class DummyRegionServerEndpoint extends DummyService 134 implements RegionServerCoprocessor { 135 public DummyRegionServerEndpoint() {} 136 137 @Override 138 public Iterable<Service> getServices() { 139 return Collections.singleton(this); 140 } 141 142 @Override 143 public void start(CoprocessorEnvironment env) throws IOException { 144 } 145 146 @Override 147 public void stop(CoprocessorEnvironment env) throws IOException { 148 } 149 150 @Override 151 public void dummyCall(RpcController controller, DummyRequest request, 152 RpcCallback<DummyResponse> callback) { 153 callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); 154 } 155 156 @Override 157 public void dummyThrow(RpcController controller, 158 DummyRequest request, 159 RpcCallback<DummyResponse> done) { 160 CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW); 161 } 162 } 163}