001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Mockito.doAnswer;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.util.Set;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.master.HMaster;
034import org.apache.hadoop.hbase.master.MasterRpcServices;
035import org.apache.hadoop.hbase.regionserver.RSRpcServices;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.RPCTests;
038import org.junit.AfterClass;
039import org.junit.Assert;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.mockito.invocation.InvocationOnMock;
045import org.mockito.stubbing.Answer;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
050
051@Category({ RPCTests.class, LargeTests.class })
052public class TestMasterFifoRpcScheduler {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class);
059
060  private static final String REGION_SERVER_REPORT = "RegionServerReport";
061  private static final String OTHER = "Other";
062  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063
064  @BeforeClass
065  public static void setupBeforeClass() throws Exception {
066    Configuration conf = TEST_UTIL.getConfiguration();
067    conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
068      "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
069    conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
070    conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2);
071    TEST_UTIL.startMiniCluster();
072  }
073
074  @AfterClass
075  public static void tearDownAfterClass() throws Exception {
076    TEST_UTIL.shutdownMiniCluster();
077  }
078
079  @Test
080  public void testMasterRpcScheduler() {
081    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
082    MasterRpcServices masterRpcServices = master.getMasterRpcServices();
083    RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler();
084    Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler);
085  }
086
087  @Test
088  public void testCallQueueInfo() throws Exception {
089    Configuration conf = HBaseConfiguration.create();
090    AtomicInteger callExecutionCount = new AtomicInteger(0);
091
092    RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1);
093    scheduler.start();
094
095    int totalCallMethods = 30;
096    int unableToDispatch = 0;
097
098    for (int i = totalCallMethods; i > 0; i--) {
099      CallRunner task = createMockTask(callExecutionCount, i < 20);
100      if (!scheduler.dispatch(task)) {
101        unableToDispatch++;
102      }
103      Thread.sleep(10);
104    }
105
106    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
107    int executionCount = callExecutionCount.get();
108
109    String expectedQueueName = "Master Fifo Queue";
110    assertEquals(1, callQueueInfo.getCallQueueNames().size());
111
112    long callQueueSize = 0;
113    for (String queueName : callQueueInfo.getCallQueueNames()) {
114      assertEquals(expectedQueueName, queueName);
115      Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName);
116      if (methodNames.size() == 2) {
117        assertTrue(methodNames.contains(REGION_SERVER_REPORT));
118        assertTrue(methodNames.contains(OTHER));
119      }
120      for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) {
121        callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName);
122      }
123    }
124
125    assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
126    scheduler.stop();
127  }
128
129  private CallRunner createMockTask(AtomicInteger callExecutionCount,
130      boolean isRegionServerReportTask) {
131    CallRunner task = mock(CallRunner.class);
132    ServerCall call = mock(ServerCall.class);
133    when(task.getRpcCall()).thenReturn(call);
134    when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder()
135        .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build());
136
137    doAnswer(new Answer<Void>() {
138      @Override
139      public Void answer(InvocationOnMock invocation) throws Throwable {
140        callExecutionCount.incrementAndGet();
141        Thread.sleep(1000);
142        return null;
143      }
144    }).when(task).run();
145
146    return task;
147  }
148
149  private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler {
150
151    public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
152        int rsReportHandlerCount) {
153      super(conf, callHandlerCount, rsReportHandlerCount);
154    }
155
156    /**
157     * Override this method because we can't mock a Descriptors.MethodDescriptor
158     */
159    @Override
160    protected String getCallMethod(final CallRunner task) {
161      RpcCall call = task.getRpcCall();
162      if (call.getHeader() != null) {
163        return call.getHeader().getMethodName();
164      }
165      return null;
166    }
167  }
168}