001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.mockito.Mockito.mock;
023
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.util.Arrays;
027import java.util.Collection;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
033import org.apache.hadoop.hbase.testclassification.RPCTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.ClassRule;
036import org.junit.Ignore;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039import org.junit.runner.RunWith;
040import org.junit.runners.Parameterized;
041import org.junit.runners.Parameterized.Parameter;
042import org.junit.runners.Parameterized.Parameters;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
045import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
046
047import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
048import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
049
050@RunWith(Parameterized.class)
051@Category({ RPCTests.class, SmallTests.class })
052public class TestRpcHandlerException {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestRpcHandlerException.class);
057
058  private final static Configuration CONF = HBaseConfiguration.create();
059
060  /**
061   * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the
062   * client will hang and the test will fail. The test is meant to be a unit test to test the
063   * behavior.
064   */
065  private class AbortServer implements Abortable {
066    private boolean aborted = false;
067
068    @Override
069    public void abort(String why, Throwable e) {
070      aborted = true;
071    }
072
073    @Override
074    public boolean isAborted() {
075      return aborted;
076    }
077  }
078
079  @Parameters(name = "{index}: rpcServerImpl={0}")
080  public static Collection<Object[]> parameters() {
081    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
082        new Object[] { NettyRpcServer.class.getName() });
083  }
084
085  @Parameter(0)
086  public String rpcServerImpl;
087
088  /*
089   * This is a unit test to make sure to abort region server when the number of Rpc handler thread
090   * caught errors exceeds the threshold. Client will hang when RS aborts.
091   */
092  @Ignore
093  @Test
094  public void testRpcScheduler() throws IOException, InterruptedException {
095    PriorityFunction qosFunction = mock(PriorityFunction.class);
096    Abortable abortable = new AbortServer();
097    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
098    RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 0, qosFunction, abortable, 0);
099    RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
100        Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
101        new InetSocketAddress("localhost", 0), CONF, scheduler);
102    try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
103      rpcServer.start();
104      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
105      stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
106    } catch (Throwable e) {
107      assert (abortable.isAborted() == true);
108    } finally {
109      rpcServer.stop();
110    }
111  }
112
113}