001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.doAnswer; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.net.InetSocketAddress; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 035import org.apache.hadoop.hbase.testclassification.RPCTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.junit.Before; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.mockito.invocation.InvocationOnMock; 042import org.mockito.stubbing.Answer; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Category({ RPCTests.class, SmallTests.class }) 047public class TestFifoRpcScheduler { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestFifoRpcScheduler.class); 052 053 private static final Logger LOG = LoggerFactory.getLogger(TestFifoRpcScheduler.class); 054 055 private AtomicInteger callExecutionCount; 056 057 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 058 @Override 059 public InetSocketAddress getListenerAddress() { 060 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 061 } 062 }; 063 private Configuration conf; 064 065 @Before 066 public void setUp() { 067 conf = HBaseConfiguration.create(); 068 callExecutionCount = new AtomicInteger(0); 069 } 070 071 private ThreadPoolExecutor disableHandlers(RpcScheduler scheduler) { 072 ThreadPoolExecutor rpcExecutor = null; 073 074 try { 075 Field ExecutorField = scheduler.getClass().getDeclaredField("executor"); 076 ExecutorField.setAccessible(true); 077 078 scheduler.start(); 079 rpcExecutor = (ThreadPoolExecutor) ExecutorField.get(scheduler); 080 081 rpcExecutor.setMaximumPoolSize(1); 082 rpcExecutor.allowCoreThreadTimeOut(true); 083 rpcExecutor.setCorePoolSize(0); 084 rpcExecutor.setKeepAliveTime(1, TimeUnit.MICROSECONDS); 085 086 // Wait for 2 seconds, so that idle threads will die 087 Thread.sleep(2000); 088 089 } catch (NoSuchFieldException e) { 090 LOG.error("No such field exception:" + e); 091 } catch (IllegalAccessException e) { 092 LOG.error("Illegal access exception:" + e); 093 } catch (InterruptedException e) { 094 LOG.error("Interrupted exception:" + e); 095 } 096 097 return rpcExecutor; 098 } 099 100 @Test 101 public void testCallQueueInfo() throws IOException, InterruptedException { 102 103 ThreadPoolExecutor rpcExecutor; 104 RpcScheduler scheduler = new FifoRpcScheduler(conf, 1); 105 106 scheduler.init(CONTEXT); 107 108 // Set number of handlers to a minimum value 109 disableHandlers(scheduler); 110 111 int totalCallMethods = 30; 112 int unableToDispatch = 0; 113 114 for (int i = totalCallMethods; i > 0; i--) { 115 CallRunner task = createMockTask(); 116 task.setStatus(new MonitoredRPCHandlerImpl()); 117 118 if (!scheduler.dispatch(task)) { 119 unableToDispatch++; 120 } 121 122 Thread.sleep(10); 123 } 124 125 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 126 int executionCount = callExecutionCount.get(); 127 int callQueueSize = 0; 128 129 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 130 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 131 callQueueSize += callQueueInfo.getCallMethodCount(callQueueName, calledMethod); 132 } 133 } 134 135 assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); 136 137 scheduler.stop(); 138 } 139 140 private CallRunner createMockTask() { 141 ServerCall call = mock(ServerCall.class); 142 CallRunner task = mock(CallRunner.class); 143 when(task.getRpcCall()).thenReturn(call); 144 145 doAnswer(new Answer<Void>() { 146 @Override 147 public Void answer(InvocationOnMock invocation) throws Throwable { 148 callExecutionCount.incrementAndGet(); 149 Thread.sleep(1000); 150 return null; 151 } 152 }).when(task).run(); 153 154 return task; 155 } 156 157}