001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.mockito.Mockito.doAnswer; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.net.InetSocketAddress; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 034import org.apache.hadoop.hbase.testclassification.RPCTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.jupiter.api.BeforeEach; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.mockito.invocation.InvocationOnMock; 040import org.mockito.stubbing.Answer; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@Tag(RPCTests.TAG) 045@Tag(SmallTests.TAG) 046public class TestFifoRpcScheduler { 047 048 private static final Logger LOG = LoggerFactory.getLogger(TestFifoRpcScheduler.class); 049 050 private AtomicInteger callExecutionCount; 051 052 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 053 @Override 054 public InetSocketAddress getListenerAddress() { 055 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 056 } 057 }; 058 private Configuration conf; 059 060 @BeforeEach 061 public void setUp() { 062 conf = HBaseConfiguration.create(); 063 callExecutionCount = new AtomicInteger(0); 064 } 065 066 private ThreadPoolExecutor disableHandlers(RpcScheduler scheduler) { 067 ThreadPoolExecutor rpcExecutor = null; 068 069 try { 070 Field ExecutorField = scheduler.getClass().getDeclaredField("executor"); 071 ExecutorField.setAccessible(true); 072 073 scheduler.start(); 074 rpcExecutor = (ThreadPoolExecutor) ExecutorField.get(scheduler); 075 076 rpcExecutor.setMaximumPoolSize(1); 077 rpcExecutor.allowCoreThreadTimeOut(true); 078 rpcExecutor.setCorePoolSize(0); 079 rpcExecutor.setKeepAliveTime(1, TimeUnit.MICROSECONDS); 080 081 // Wait for 2 seconds, so that idle threads will die 082 Thread.sleep(2000); 083 084 } catch (NoSuchFieldException e) { 085 LOG.error("No such field exception:" + e); 086 } catch (IllegalAccessException e) { 087 LOG.error("Illegal access exception:" + e); 088 } catch (InterruptedException e) { 089 LOG.error("Interrupted exception:" + e); 090 } 091 092 return rpcExecutor; 093 } 094 095 @Test 096 public void testCallQueueInfo() throws IOException, InterruptedException { 097 098 ThreadPoolExecutor rpcExecutor; 099 RpcScheduler scheduler = new FifoRpcScheduler(conf, 1); 100 101 scheduler.init(CONTEXT); 102 103 // Set number of handlers to a minimum value 104 disableHandlers(scheduler); 105 106 int totalCallMethods = 30; 107 int unableToDispatch = 0; 108 109 for (int i = totalCallMethods; i > 0; i--) { 110 CallRunner task = createMockTask(); 111 task.setStatus(new MonitoredRPCHandlerImpl("test")); 112 113 if (!scheduler.dispatch(task)) { 114 unableToDispatch++; 115 } 116 117 Thread.sleep(10); 118 } 119 120 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 121 int executionCount = callExecutionCount.get(); 122 int callQueueSize = 0; 123 124 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 125 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 126 callQueueSize += callQueueInfo.getCallMethodCount(callQueueName, calledMethod); 127 } 128 } 129 130 assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); 131 132 scheduler.stop(); 133 } 134 135 private CallRunner createMockTask() { 136 ServerCall call = mock(ServerCall.class); 137 CallRunner task = mock(CallRunner.class); 138 when(task.getRpcCall()).thenReturn(call); 139 140 doAnswer(new Answer<Void>() { 141 @Override 142 public Void answer(InvocationOnMock invocation) throws Throwable { 143 callExecutionCount.incrementAndGet(); 144 Thread.sleep(1000); 145 return null; 146 } 147 }).when(task).run(); 148 149 return task; 150 } 151 152}