001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.Mockito.doAnswer;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.util.Set;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.master.HMaster;
033import org.apache.hadoop.hbase.master.MasterRpcServices;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.RPCTests;
036import org.junit.jupiter.api.AfterAll;
037import org.junit.jupiter.api.BeforeAll;
038import org.junit.jupiter.api.Tag;
039import org.junit.jupiter.api.Test;
040import org.mockito.invocation.InvocationOnMock;
041import org.mockito.stubbing.Answer;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
044
045@Tag(RPCTests.TAG)
046@Tag(LargeTests.TAG)
047public class TestMasterFifoRpcScheduler {
048
049  private static final String REGION_SERVER_REPORT = "RegionServerReport";
050  private static final String OTHER = "Other";
051  private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
052
053  @BeforeAll
054  public static void setupBeforeClass() throws Exception {
055    Configuration conf = TEST_UTIL.getConfiguration();
056    conf.set(MasterRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
057      "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
058    conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
059    conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2);
060    TEST_UTIL.startMiniCluster();
061  }
062
063  @AfterAll
064  public static void tearDownAfterClass() throws Exception {
065    TEST_UTIL.shutdownMiniCluster();
066  }
067
068  @Test
069  public void testMasterRpcScheduler() {
070    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
071    MasterRpcServices masterRpcServices = master.getMasterRpcServices();
072    RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler();
073    assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler);
074  }
075
076  @Test
077  public void testCallQueueInfo() throws Exception {
078    Configuration conf = HBaseConfiguration.create();
079    AtomicInteger callExecutionCount = new AtomicInteger(0);
080
081    RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1);
082    scheduler.start();
083
084    int totalCallMethods = 30;
085    int unableToDispatch = 0;
086
087    for (int i = totalCallMethods; i > 0; i--) {
088      CallRunner task = createMockTask(callExecutionCount, i < 20);
089      if (!scheduler.dispatch(task)) {
090        unableToDispatch++;
091      }
092      Thread.sleep(10);
093    }
094
095    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
096    int executionCount = callExecutionCount.get();
097
098    String expectedQueueName = "Master Fifo Queue";
099    assertEquals(1, callQueueInfo.getCallQueueNames().size());
100
101    long callQueueSize = 0;
102    for (String queueName : callQueueInfo.getCallQueueNames()) {
103      assertEquals(expectedQueueName, queueName);
104      Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName);
105      if (methodNames.size() == 2) {
106        assertTrue(methodNames.contains(REGION_SERVER_REPORT));
107        assertTrue(methodNames.contains(OTHER));
108      }
109      for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) {
110        callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName);
111      }
112    }
113
114    assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
115    scheduler.stop();
116  }
117
118  private CallRunner createMockTask(AtomicInteger callExecutionCount,
119    boolean isRegionServerReportTask) {
120    CallRunner task = mock(CallRunner.class);
121    ServerCall call = mock(ServerCall.class);
122    when(task.getRpcCall()).thenReturn(call);
123    when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder()
124      .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build());
125
126    doAnswer(new Answer<Void>() {
127      @Override
128      public Void answer(InvocationOnMock invocation) throws Throwable {
129        callExecutionCount.incrementAndGet();
130        Thread.sleep(1000);
131        return null;
132      }
133    }).when(task).run();
134
135    return task;
136  }
137
138  private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler {
139
140    public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
141      int rsReportHandlerCount) {
142      super(conf, callHandlerCount, rsReportHandlerCount);
143    }
144
145    /**
146     * Override this method because we can't mock a Descriptors.MethodDescriptor
147     */
148    @Override
149    protected String getCallMethod(final CallRunner task) {
150      RpcCall call = task.getRpcCall();
151      if (call.getHeader() != null) {
152        return call.getHeader().getMethodName();
153      }
154      return null;
155    }
156  }
157}