001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY;
021import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_HANDLER_FACTOR;
022import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT;
023import static org.apache.hadoop.hbase.ipc.RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
024import static org.mockito.Mockito.mock;
025
026import java.util.List;
027import java.util.concurrent.BlockingQueue;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.testclassification.MediumTests;
031import org.apache.hadoop.hbase.testclassification.RPCTests;
032import org.junit.jupiter.api.Assertions;
033import org.junit.jupiter.api.BeforeAll;
034import org.junit.jupiter.api.Tag;
035import org.junit.jupiter.api.Test;
036import org.junit.jupiter.api.TestInfo;
037
038@Tag(RPCTests.TAG)
039@Tag(MediumTests.TAG)
040public class TestRpcExecutor {
041
042  private static Configuration conf;
043
044  @BeforeAll
045  public static void setUp() {
046    conf = HBaseConfiguration.create();
047  }
048
049  /**
050   * Test that validates default soft and hard limits when maxQueueLength is not explicitly
051   * configured (-1).
052   */
053  @Test
054  public void testDefaultQueueLimits(TestInfo testInfo) {
055    PriorityFunction qosFunction = mock(PriorityFunction.class);
056    int handlerCount = 100;
057    // Pass -1 to use default maxQueueLength calculation
058    int defaultMaxQueueLength = -1;
059
060    BalancedQueueRpcExecutor executor =
061      new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName(), handlerCount,
062        defaultMaxQueueLength, qosFunction, conf, null);
063
064    List<BlockingQueue<CallRunner>> queues = executor.getQueues();
065    int expectedQueueSize = Math.round(handlerCount * DEFAULT_CALL_QUEUE_HANDLER_FACTOR);
066    Assertions.assertEquals(expectedQueueSize, queues.size(),
067      "Number of queues should be according to default callQueueHandlerFactor");
068
069    // By default, the soft limit depends on number of handler the queue will serve
070    int expectedSoftLimit =
071      (handlerCount / expectedQueueSize) * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
072    Assertions.assertEquals(expectedSoftLimit, executor.currentQueueLimit,
073      "Soft limit of queues is wrongly calculated");
074
075    // Hard limit should be maximum of softLimit and DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT
076    int hardQueueLimit = queues.get(0).remainingCapacity() + queues.get(0).size();
077    int expectedHardLimit = Math.max(expectedSoftLimit, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
078    Assertions.assertEquals(expectedHardLimit, hardQueueLimit,
079      "Default hard limit of queues is wrongly calculated ");
080  }
081
082  /**
083   * Test that validates configured soft and hard limits when maxQueueLength is explicitly set.
084   */
085  @Test
086  public void testConfiguredQueueLimits(TestInfo testInfo) {
087    float callQueueHandlerFactor = 0.2f;
088    conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor);
089    PriorityFunction qosFunction = mock(PriorityFunction.class);
090    int handlerCount = 100;
091    int maxQueueLength = 150;
092
093    BalancedQueueRpcExecutor executor =
094      new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName() + "1", handlerCount,
095        maxQueueLength, qosFunction, conf, null);
096
097    Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit,
098      "Configured soft limit is not applied.");
099
100    List<BlockingQueue<CallRunner>> queues1 = executor.getQueues();
101
102    int expectedQueueSize = Math.round(handlerCount * callQueueHandlerFactor);
103    Assertions.assertEquals(expectedQueueSize, queues1.size(),
104      "Number of queues should be according to callQueueHandlerFactor");
105
106    int hardQueueLimit1 = queues1.get(0).remainingCapacity() + queues1.get(0).size();
107    Assertions.assertEquals(DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, hardQueueLimit1,
108      "Default Hard limit is not applied");
109
110  }
111}