001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; 021import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_HANDLER_FACTOR; 022import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT; 023import static org.apache.hadoop.hbase.ipc.RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; 024import static org.mockito.Mockito.mock; 025 026import java.util.List; 027import java.util.concurrent.BlockingQueue; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.apache.hadoop.hbase.testclassification.RPCTests; 032import org.junit.jupiter.api.Assertions; 033import org.junit.jupiter.api.BeforeAll; 034import org.junit.jupiter.api.Tag; 035import org.junit.jupiter.api.Test; 036import org.junit.jupiter.api.TestInfo; 037 038@Tag(RPCTests.TAG) 039@Tag(MediumTests.TAG) 040public class TestRpcExecutor { 041 042 private static Configuration conf; 043 044 @BeforeAll 045 public static void setUp() { 046 conf = HBaseConfiguration.create(); 047 } 048 049 /** 050 * Test that validates default soft and hard limits when maxQueueLength is not explicitly 051 * configured (-1). 052 */ 053 @Test 054 public void testDefaultQueueLimits(TestInfo testInfo) { 055 PriorityFunction qosFunction = mock(PriorityFunction.class); 056 int handlerCount = 100; 057 // Pass -1 to use default maxQueueLength calculation 058 int defaultMaxQueueLength = -1; 059 060 BalancedQueueRpcExecutor executor = 061 new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName(), handlerCount, 062 defaultMaxQueueLength, qosFunction, conf, null); 063 064 List<BlockingQueue<CallRunner>> queues = executor.getQueues(); 065 int expectedQueueSize = Math.round(handlerCount * DEFAULT_CALL_QUEUE_HANDLER_FACTOR); 066 Assertions.assertEquals(expectedQueueSize, queues.size(), 067 "Number of queues should be according to default callQueueHandlerFactor"); 068 069 // By default, the soft limit depends on number of handler the queue will serve 070 int expectedSoftLimit = 071 (handlerCount / expectedQueueSize) * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; 072 Assertions.assertEquals(expectedSoftLimit, executor.currentQueueLimit, 073 "Soft limit of queues is wrongly calculated"); 074 075 // Hard limit should be maximum of softLimit and DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT 076 int hardQueueLimit = queues.get(0).remainingCapacity() + queues.get(0).size(); 077 int expectedHardLimit = Math.max(expectedSoftLimit, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); 078 Assertions.assertEquals(expectedHardLimit, hardQueueLimit, 079 "Default hard limit of queues is wrongly calculated "); 080 } 081 082 /** 083 * Test that validates configured soft and hard limits when maxQueueLength is explicitly set. 084 */ 085 @Test 086 public void testConfiguredQueueLimits(TestInfo testInfo) { 087 float callQueueHandlerFactor = 0.2f; 088 conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor); 089 PriorityFunction qosFunction = mock(PriorityFunction.class); 090 int handlerCount = 100; 091 int maxQueueLength = 150; 092 093 BalancedQueueRpcExecutor executor = 094 new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName() + "1", handlerCount, 095 maxQueueLength, qosFunction, conf, null); 096 097 Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit, 098 "Configured soft limit is not applied."); 099 100 List<BlockingQueue<CallRunner>> queues1 = executor.getQueues(); 101 102 int expectedQueueSize = Math.round(handlerCount * callQueueHandlerFactor); 103 Assertions.assertEquals(expectedQueueSize, queues1.size(), 104 "Number of queues should be according to callQueueHandlerFactor"); 105 106 int hardQueueLimit1 = queues1.get(0).remainingCapacity() + queues1.get(0).size(); 107 Assertions.assertEquals(DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, hardQueueLimit1, 108 "Default Hard limit is not applied"); 109 110 } 111}