001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.mockito.Mockito.mock;
023
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.util.Arrays;
027import java.util.Collection;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
033import org.apache.hadoop.hbase.testclassification.RPCTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.junit.runner.RunWith;
039import org.junit.runners.Parameterized;
040import org.junit.runners.Parameterized.Parameter;
041import org.junit.runners.Parameterized.Parameters;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
045
046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
047import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
048
049@RunWith(Parameterized.class)
050@Category({ RPCTests.class, SmallTests.class })
051public class TestRpcHandlerException {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055    HBaseClassTestRule.forClass(TestRpcHandlerException.class);
056
057  private final static Configuration CONF = HBaseConfiguration.create();
058
059  /**
060   * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the
061   * client will hang and the test will fail. The test is meant to be a unit test to test the
062   * behavior.
063   */
064  private class AbortServer implements Abortable {
065    private boolean aborted = false;
066
067    @Override
068    public void abort(String why, Throwable e) {
069      aborted = true;
070    }
071
072    @Override
073    public boolean isAborted() {
074      return aborted;
075    }
076  }
077
078  @Parameters(name = "{index}: rpcServerImpl={0}")
079  public static Collection<Object[]> parameters() {
080    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
081      new Object[] { NettyRpcServer.class.getName() });
082  }
083
084  @Parameter(0)
085  public String rpcServerImpl;
086
087  /*
088   * This is a unit test to make sure to abort region server when the number of Rpc handler thread
089   * caught errors exceeds the threshold. Client will hang when RS aborts.
090   */
091  @Test
092  public void testRpcScheduler() throws IOException, InterruptedException {
093    PriorityFunction qosFunction = mock(PriorityFunction.class);
094    Abortable abortable = new AbortServer();
095    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
096    RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 0, qosFunction, abortable, 0);
097    RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
098      Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
099      new InetSocketAddress("localhost", 0), CONF, scheduler);
100    try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
101      rpcServer.start();
102      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
103      stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
104    } catch (Throwable e) {
105      assert (abortable.isAborted() == true);
106    } finally {
107      rpcServer.stop();
108    }
109  }
110
111}