001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Mockito.doAnswer;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.util.Set;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.master.HMaster;
034import org.apache.hadoop.hbase.master.MasterRpcServices;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.RPCTests;
037import org.junit.AfterClass;
038import org.junit.Assert;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.invocation.InvocationOnMock;
044import org.mockito.stubbing.Answer;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
047
048@Category({ RPCTests.class, LargeTests.class })
049public class TestMasterFifoRpcScheduler {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class);
054
055  private static final String REGION_SERVER_REPORT = "RegionServerReport";
056  private static final String OTHER = "Other";
057  private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
058
059  @BeforeClass
060  public static void setupBeforeClass() throws Exception {
061    Configuration conf = TEST_UTIL.getConfiguration();
062    conf.set(MasterRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
063      "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
064    conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
065    conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2);
066    TEST_UTIL.startMiniCluster();
067  }
068
069  @AfterClass
070  public static void tearDownAfterClass() throws Exception {
071    TEST_UTIL.shutdownMiniCluster();
072  }
073
074  @Test
075  public void testMasterRpcScheduler() {
076    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
077    MasterRpcServices masterRpcServices = master.getMasterRpcServices();
078    RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler();
079    Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler);
080  }
081
082  @Test
083  public void testCallQueueInfo() throws Exception {
084    Configuration conf = HBaseConfiguration.create();
085    AtomicInteger callExecutionCount = new AtomicInteger(0);
086
087    RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1);
088    scheduler.start();
089
090    int totalCallMethods = 30;
091    int unableToDispatch = 0;
092
093    for (int i = totalCallMethods; i > 0; i--) {
094      CallRunner task = createMockTask(callExecutionCount, i < 20);
095      if (!scheduler.dispatch(task)) {
096        unableToDispatch++;
097      }
098      Thread.sleep(10);
099    }
100
101    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
102    int executionCount = callExecutionCount.get();
103
104    String expectedQueueName = "Master Fifo Queue";
105    assertEquals(1, callQueueInfo.getCallQueueNames().size());
106
107    long callQueueSize = 0;
108    for (String queueName : callQueueInfo.getCallQueueNames()) {
109      assertEquals(expectedQueueName, queueName);
110      Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName);
111      if (methodNames.size() == 2) {
112        assertTrue(methodNames.contains(REGION_SERVER_REPORT));
113        assertTrue(methodNames.contains(OTHER));
114      }
115      for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) {
116        callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName);
117      }
118    }
119
120    assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
121    scheduler.stop();
122  }
123
124  private CallRunner createMockTask(AtomicInteger callExecutionCount,
125    boolean isRegionServerReportTask) {
126    CallRunner task = mock(CallRunner.class);
127    ServerCall call = mock(ServerCall.class);
128    when(task.getRpcCall()).thenReturn(call);
129    when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder()
130      .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build());
131
132    doAnswer(new Answer<Void>() {
133      @Override
134      public Void answer(InvocationOnMock invocation) throws Throwable {
135        callExecutionCount.incrementAndGet();
136        Thread.sleep(1000);
137        return null;
138      }
139    }).when(task).run();
140
141    return task;
142  }
143
144  private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler {
145
146    public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
147      int rsReportHandlerCount) {
148      super(conf, callHandlerCount, rsReportHandlerCount);
149    }
150
151    /**
152     * Override this method because we can't mock a Descriptors.MethodDescriptor
153     */
154    @Override
155    protected String getCallMethod(final CallRunner task) {
156      RpcCall call = task.getRpcCall();
157      if (call.getHeader() != null) {
158        return call.getHeader().getMethodName();
159      }
160      return null;
161    }
162  }
163}