001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY; 021import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY; 022import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; 023import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027 028import java.util.List; 029import java.util.concurrent.BlockingQueue; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.testclassification.RPCTests; 035import org.junit.Before; 036import org.junit.ClassRule; 037import org.junit.Rule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.junit.rules.TestName; 041 042@Category({ RPCTests.class, MediumTests.class }) 043public class TestRWQueueRpcExecutor { 044 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestRWQueueRpcExecutor.class); 048 049 @Rule 050 public TestName testName = new TestName(); 051 052 private Configuration conf; 053 054 @Before 055 public void setUp() { 056 conf = HBaseConfiguration.create(); 057 conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 058 conf.setFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 059 conf.setFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 060 } 061 062 @Test 063 public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { 064 PriorityFunction qosFunction = mock(PriorityFunction.class); 065 int softQueueLimit = 100; 066 RWQueueRpcExecutor executor = new RWQueueRpcExecutor(testName.getMethodName(), 100, 067 softQueueLimit, qosFunction, conf, null); 068 069 QueueBalancer readBalancer = executor.getReadBalancer(); 070 QueueBalancer writeBalancer = executor.getWriteBalancer(); 071 QueueBalancer scanBalancer = executor.getScanBalancer(); 072 073 assertTrue(readBalancer instanceof RandomQueueBalancer); 074 assertTrue(writeBalancer instanceof RandomQueueBalancer); 075 assertTrue(scanBalancer instanceof RandomQueueBalancer); 076 077 List<BlockingQueue<CallRunner>> readQueues = ((RandomQueueBalancer) readBalancer).getQueues(); 078 List<BlockingQueue<CallRunner>> writeQueues = ((RandomQueueBalancer) writeBalancer).getQueues(); 079 List<BlockingQueue<CallRunner>> scanQueues = ((RandomQueueBalancer) scanBalancer).getQueues(); 080 081 assertEquals(25, readQueues.size()); 082 assertEquals(50, writeQueues.size()); 083 assertEquals(25, scanQueues.size()); 084 assertEquals("Soft limit is not applied properly", softQueueLimit, executor.currentQueueLimit); 085 // Hard Limit is applied as the max capacity of the queue 086 int hardQueueLimit = readQueues.get(0).remainingCapacity() + readQueues.get(0).size(); 087 assertEquals("Default hard limit should be applied ", DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, 088 hardQueueLimit); 089 090 verifyDistinct(readQueues, writeQueues, scanQueues); 091 verifyDistinct(writeQueues, readQueues, scanQueues); 092 verifyDistinct(scanQueues, readQueues, writeQueues); 093 094 } 095 096 private void verifyDistinct(List<BlockingQueue<CallRunner>> queues, 097 List<BlockingQueue<CallRunner>>... others) throws InterruptedException { 098 CallRunner mock = mock(CallRunner.class); 099 for (BlockingQueue<CallRunner> queue : queues) { 100 queue.put(mock); 101 assertEquals(1, queue.size()); 102 } 103 104 for (List<BlockingQueue<CallRunner>> other : others) { 105 for (BlockingQueue<CallRunner> queue : other) { 106 assertEquals(0, queue.size()); 107 } 108 } 109 110 // clear them for next test 111 for (BlockingQueue<CallRunner> queue : queues) { 112 queue.clear(); 113 } 114 } 115}