001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import com.google.protobuf.RpcCallback;
025import com.google.protobuf.RpcController;
026import com.google.protobuf.Service;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.util.Collections;
030import org.apache.hadoop.hbase.CoprocessorEnvironment;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.RetriesExhaustedException;
036import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
037import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
038import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
039import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
040import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
041import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
042import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
043import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.runner.RunWith;
051import org.junit.runners.Parameterized;
052
053@RunWith(Parameterized.class)
054@Category({ ClientTests.class, MediumTests.class })
055public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class);
059
060  private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
061  private static final String DUMMY_VALUE = "val";
062
063  @BeforeClass
064  public static void setUpBeforeClass() throws Exception {
065    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
066    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
067    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
068    TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
069      ProtobufCoprocessorService.class.getName());
070    TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
071      DummyRegionServerEndpoint.class.getName());
072    TEST_UTIL.startMiniCluster(2);
073    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
074  }
075
076  @Test
077  public void testMasterCoprocessorService() throws Exception {
078    TestProtos.EchoRequestProto request =
079        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
080    TestProtos.EchoResponseProto response =
081        admin
082            .<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto>
083                coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub,
084                  (s, c, done) -> s.echo(c, request, done)).get();
085    assertEquals("hello", response.getMessage());
086  }
087
088  @Test
089  public void testMasterCoprocessorError() throws Exception {
090    TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
091    try {
092      admin
093          .<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto>
094              coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub,
095                (s, c, done) -> s.error(c, emptyRequest, done)).get();
096      fail("Should have thrown an exception");
097    } catch (Exception e) {
098    }
099  }
100
101  @Test
102  public void testRegionServerCoprocessorService() throws Exception {
103    final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
104    DummyRegionServerEndpointProtos.DummyRequest request =
105        DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
106    DummyRegionServerEndpointProtos.DummyResponse response =
107        admin
108            .<DummyRegionServerEndpointProtos.DummyService.Stub,
109                DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
110              DummyRegionServerEndpointProtos.DummyService::newStub,
111                  (s, c, done) -> s.dummyCall(c, request, done), serverName).get();
112    assertEquals(DUMMY_VALUE, response.getValue());
113  }
114
115  @Test
116  public void testRegionServerCoprocessorServiceError() throws Exception {
117    final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
118    DummyRegionServerEndpointProtos.DummyRequest request =
119        DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
120    try {
121      admin
122          .<DummyRegionServerEndpointProtos.DummyService.Stub,
123              DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
124            DummyRegionServerEndpointProtos.DummyService::newStub,
125                (s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
126      fail("Should have thrown an exception");
127    } catch (Exception e) {
128      assertTrue(e.getCause() instanceof RetriesExhaustedException);
129      assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
130    }
131  }
132
133  public static class DummyRegionServerEndpoint extends DummyService
134          implements RegionServerCoprocessor {
135    public DummyRegionServerEndpoint() {}
136
137    @Override
138    public Iterable<Service> getServices() {
139      return Collections.singleton(this);
140    }
141
142    @Override
143    public void start(CoprocessorEnvironment env) throws IOException {
144    }
145
146    @Override
147    public void stop(CoprocessorEnvironment env) throws IOException {
148    }
149
150    @Override
151    public void dummyCall(RpcController controller, DummyRequest request,
152        RpcCallback<DummyResponse> callback) {
153      callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
154    }
155
156    @Override
157    public void dummyThrow(RpcController controller,
158        DummyRequest request,
159        RpcCallback<DummyResponse> done) {
160      CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
161    }
162  }
163}