001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.mockito.Mockito.mock;
023
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.util.Arrays;
027import java.util.stream.Stream;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
032import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
033import org.apache.hadoop.hbase.testclassification.RPCTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.jupiter.api.Tag;
036import org.junit.jupiter.api.TestTemplate;
037import org.junit.jupiter.params.provider.Arguments;
038
039import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
040import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
041
042import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
044
045@Tag(RPCTests.TAG)
046@Tag(SmallTests.TAG)
047@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}")
048public class TestRpcHandlerException {
049
050  private final static Configuration CONF = HBaseConfiguration.create();
051  private final String rpcServerImpl;
052
053  /**
054   * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the
055   * client will hang and the test will fail. The test is meant to be a unit test to test the
056   * behavior.
057   */
058  private class AbortServer implements Abortable {
059    private boolean aborted = false;
060
061    @Override
062    public void abort(String why, Throwable e) {
063      aborted = true;
064    }
065
066    @Override
067    public boolean isAborted() {
068      return aborted;
069    }
070  }
071
072  public TestRpcHandlerException(String rpcServerImpl) {
073    this.rpcServerImpl = rpcServerImpl;
074  }
075
076  public static Stream<Arguments> parameters() {
077    return Arrays
078      .stream(new Object[] { SimpleRpcServer.class.getName(), NettyRpcServer.class.getName() })
079      .map(Arguments::of);
080  }
081
082  /*
083   * This is a unit test to make sure to abort region server when the number of Rpc handler thread
084   * caught errors exceeds the threshold. Client will hang when RS aborts.
085   */
086  @TestTemplate
087  public void testRpcScheduler() throws IOException, InterruptedException {
088    PriorityFunction qosFunction = mock(PriorityFunction.class);
089    Abortable abortable = new AbortServer();
090    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
091    RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 0, qosFunction, abortable, 0);
092    RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
093      Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
094      new InetSocketAddress("localhost", 0), CONF, scheduler);
095    try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
096      rpcServer.start();
097      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
098      stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
099    } catch (Throwable e) {
100      assert (abortable.isAborted() == true);
101    } finally {
102      rpcServer.stop();
103    }
104  }
105
106}