001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Mockito.doAnswer; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.Set; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.master.HMaster; 034import org.apache.hadoop.hbase.master.MasterRpcServices; 035import org.apache.hadoop.hbase.regionserver.RSRpcServices; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.RPCTests; 038import org.junit.AfterClass; 039import org.junit.Assert; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.mockito.invocation.InvocationOnMock; 045import org.mockito.stubbing.Answer; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 050 051@Category({ RPCTests.class, LargeTests.class }) 052public class TestMasterFifoRpcScheduler { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class); 059 060 private static final String REGION_SERVER_REPORT = "RegionServerReport"; 061 private static final String OTHER = "Other"; 062 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 064 @BeforeClass 065 public static void setupBeforeClass() throws Exception { 066 Configuration conf = TEST_UTIL.getConfiguration(); 067 conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS, 068 "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory"); 069 conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); 070 conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2); 071 TEST_UTIL.startMiniCluster(); 072 } 073 074 @AfterClass 075 public static void tearDownAfterClass() throws Exception { 076 TEST_UTIL.shutdownMiniCluster(); 077 } 078 079 @Test 080 public void testMasterRpcScheduler() { 081 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 082 MasterRpcServices masterRpcServices = master.getMasterRpcServices(); 083 RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler(); 084 Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler); 085 } 086 087 @Test 088 public void testCallQueueInfo() throws Exception { 089 Configuration conf = HBaseConfiguration.create(); 090 AtomicInteger callExecutionCount = new AtomicInteger(0); 091 092 RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1); 093 scheduler.start(); 094 095 int totalCallMethods = 30; 096 int unableToDispatch = 0; 097 098 for (int i = totalCallMethods; i > 0; i--) { 099 CallRunner task = createMockTask(callExecutionCount, i < 20); 100 if (!scheduler.dispatch(task)) { 101 unableToDispatch++; 102 } 103 Thread.sleep(10); 104 } 105 106 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 107 int executionCount = callExecutionCount.get(); 108 109 String expectedQueueName = "Master Fifo Queue"; 110 assertEquals(1, callQueueInfo.getCallQueueNames().size()); 111 112 long callQueueSize = 0; 113 for (String queueName : callQueueInfo.getCallQueueNames()) { 114 assertEquals(expectedQueueName, queueName); 115 Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName); 116 if (methodNames.size() == 2) { 117 assertTrue(methodNames.contains(REGION_SERVER_REPORT)); 118 assertTrue(methodNames.contains(OTHER)); 119 } 120 for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) { 121 callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName); 122 } 123 } 124 125 assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); 126 scheduler.stop(); 127 } 128 129 private CallRunner createMockTask(AtomicInteger callExecutionCount, 130 boolean isRegionServerReportTask) { 131 CallRunner task = mock(CallRunner.class); 132 ServerCall call = mock(ServerCall.class); 133 when(task.getRpcCall()).thenReturn(call); 134 when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder() 135 .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build()); 136 137 doAnswer(new Answer<Void>() { 138 @Override 139 public Void answer(InvocationOnMock invocation) throws Throwable { 140 callExecutionCount.incrementAndGet(); 141 Thread.sleep(1000); 142 return null; 143 } 144 }).when(task).run(); 145 146 return task; 147 } 148 149 private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler { 150 151 public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount, 152 int rsReportHandlerCount) { 153 super(conf, callHandlerCount, rsReportHandlerCount); 154 } 155 156 /** 157 * Override this method because we can't mock a Descriptors.MethodDescriptor 158 */ 159 @Override 160 protected String getCallMethod(final CallRunner task) { 161 RpcCall call = task.getRpcCall(); 162 if (call.getHeader() != null) { 163 return call.getHeader().getMethodName(); 164 } 165 return null; 166 } 167 } 168}