001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.Mockito.doAnswer; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.Set; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.master.HMaster; 033import org.apache.hadoop.hbase.master.MasterRpcServices; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.RPCTests; 036import org.junit.jupiter.api.AfterAll; 037import org.junit.jupiter.api.BeforeAll; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.Test; 040import org.mockito.invocation.InvocationOnMock; 041import org.mockito.stubbing.Answer; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 044 045@Tag(RPCTests.TAG) 046@Tag(LargeTests.TAG) 047public class TestMasterFifoRpcScheduler { 048 049 private static final String REGION_SERVER_REPORT = "RegionServerReport"; 050 private static final String OTHER = "Other"; 051 private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 052 053 @BeforeAll 054 public static void setupBeforeClass() throws Exception { 055 Configuration conf = TEST_UTIL.getConfiguration(); 056 conf.set(MasterRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS, 057 "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory"); 058 conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); 059 conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2); 060 TEST_UTIL.startMiniCluster(); 061 } 062 063 @AfterAll 064 public static void tearDownAfterClass() throws Exception { 065 TEST_UTIL.shutdownMiniCluster(); 066 } 067 068 @Test 069 public void testMasterRpcScheduler() { 070 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 071 MasterRpcServices masterRpcServices = master.getMasterRpcServices(); 072 RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler(); 073 assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler); 074 } 075 076 @Test 077 public void testCallQueueInfo() throws Exception { 078 Configuration conf = HBaseConfiguration.create(); 079 AtomicInteger callExecutionCount = new AtomicInteger(0); 080 081 RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1); 082 scheduler.start(); 083 084 int totalCallMethods = 30; 085 int unableToDispatch = 0; 086 087 for (int i = totalCallMethods; i > 0; i--) { 088 CallRunner task = createMockTask(callExecutionCount, i < 20); 089 if (!scheduler.dispatch(task)) { 090 unableToDispatch++; 091 } 092 Thread.sleep(10); 093 } 094 095 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 096 int executionCount = callExecutionCount.get(); 097 098 String expectedQueueName = "Master Fifo Queue"; 099 assertEquals(1, callQueueInfo.getCallQueueNames().size()); 100 101 long callQueueSize = 0; 102 for (String queueName : callQueueInfo.getCallQueueNames()) { 103 assertEquals(expectedQueueName, queueName); 104 Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName); 105 if (methodNames.size() == 2) { 106 assertTrue(methodNames.contains(REGION_SERVER_REPORT)); 107 assertTrue(methodNames.contains(OTHER)); 108 } 109 for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) { 110 callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName); 111 } 112 } 113 114 assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); 115 scheduler.stop(); 116 } 117 118 private CallRunner createMockTask(AtomicInteger callExecutionCount, 119 boolean isRegionServerReportTask) { 120 CallRunner task = mock(CallRunner.class); 121 ServerCall call = mock(ServerCall.class); 122 when(task.getRpcCall()).thenReturn(call); 123 when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder() 124 .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build()); 125 126 doAnswer(new Answer<Void>() { 127 @Override 128 public Void answer(InvocationOnMock invocation) throws Throwable { 129 callExecutionCount.incrementAndGet(); 130 Thread.sleep(1000); 131 return null; 132 } 133 }).when(task).run(); 134 135 return task; 136 } 137 138 private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler { 139 140 public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount, 141 int rsReportHandlerCount) { 142 super(conf, callHandlerCount, rsReportHandlerCount); 143 } 144 145 /** 146 * Override this method because we can't mock a Descriptors.MethodDescriptor 147 */ 148 @Override 149 protected String getCallMethod(final CallRunner task) { 150 RpcCall call = task.getRpcCall(); 151 if (call.getHeader() != null) { 152 return call.getHeader().getMethodName(); 153 } 154 return null; 155 } 156 } 157}