001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.awaitility.Awaitility.await; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotEquals; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.ArgumentMatchers.eq; 028import static org.mockito.Mockito.doAnswer; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.timeout; 031import static org.mockito.Mockito.verify; 032import static org.mockito.Mockito.when; 033 034import java.io.IOException; 035import java.lang.reflect.Field; 036import java.net.InetSocketAddress; 037import java.util.ArrayList; 038import java.util.Collections; 039import java.util.HashSet; 040import java.util.List; 041import java.util.Map; 042import java.util.Set; 043import java.util.concurrent.CountDownLatch; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.Abortable; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 052import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.RPCTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.EnvironmentEdge; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.Threads; 059import org.junit.jupiter.api.BeforeEach; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062import org.junit.jupiter.api.TestInfo; 063import org.mockito.Mockito; 064import org.mockito.invocation.InvocationOnMock; 065import org.mockito.stubbing.Answer; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 072import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 073 074import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 078 079@Tag(RPCTests.TAG) 080@Tag(MediumTests.TAG) 081public class TestSimpleRpcScheduler { 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 084 085 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 086 @Override 087 public InetSocketAddress getListenerAddress() { 088 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 089 } 090 }; 091 private Configuration conf; 092 private String testMethodName; 093 private final AtomicInteger requestProcessed = new AtomicInteger(0); 094 095 @BeforeEach 096 public void setUp(TestInfo testInfo) { 097 testMethodName = testInfo.getTestMethod().get().getName(); 098 conf = HBaseConfiguration.create(); 099 } 100 101 @Test 102 public void testBasic() throws IOException, InterruptedException { 103 104 PriorityFunction qosFunction = mock(PriorityFunction.class); 105 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0); 106 scheduler.init(CONTEXT); 107 scheduler.start(); 108 CallRunner task = createMockTask(HConstants.NORMAL_QOS); 109 task.setStatus(new MonitoredRPCHandlerImpl("test")); 110 scheduler.dispatch(task); 111 verify(task, timeout(10000)).run(); 112 scheduler.stop(); 113 } 114 115 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 116 try { 117 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 118 ExecutorField.setAccessible(true); 119 120 RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler); 121 122 Field handlerCountField = 123 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount"); 124 125 handlerCountField.setAccessible(true); 126 handlerCountField.set(rpcExecutor, 0); 127 128 Field numCallQueuesField = 129 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues"); 130 131 numCallQueuesField.setAccessible(true); 132 numCallQueuesField.set(rpcExecutor, 1); 133 134 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass() 135 .getDeclaredField("currentQueueLimit"); 136 137 currentQueueLimitField.setAccessible(true); 138 currentQueueLimitField.set(rpcExecutor, 100); 139 140 } catch (NoSuchFieldException e) { 141 LOG.error("No such field exception" + e); 142 } catch (IllegalAccessException e) { 143 LOG.error("Illegal access exception" + e); 144 } 145 146 return scheduler; 147 } 148 149 @Test 150 public void testCallQueueInfo() throws IOException, InterruptedException { 151 152 PriorityFunction qosFunction = mock(PriorityFunction.class); 153 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0); 154 155 scheduler.init(CONTEXT); 156 157 // Set the handlers to zero. So that number of requests in call Queue can be tested 158 scheduler = disableHandlers(scheduler); 159 scheduler.start(); 160 161 int totalCallMethods = 10; 162 for (int i = totalCallMethods; i > 0; i--) { 163 CallRunner task = createMockTask(HConstants.NORMAL_QOS); 164 task.setStatus(new MonitoredRPCHandlerImpl("test")); 165 scheduler.dispatch(task); 166 } 167 168 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 169 170 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 171 172 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 173 assertEquals(totalCallMethods, 174 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 175 } 176 177 } 178 179 scheduler.stop(); 180 181 } 182 183 @Test 184 public void testHandlerIsolation() throws IOException, InterruptedException { 185 CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS); 186 CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1); 187 CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS); 188 List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask); 189 Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask, 190 HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS); 191 PriorityFunction qosFunction = mock(PriorityFunction.class); 192 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 193 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 194 Answer<Void> answerToRun = new Answer<Void>() { 195 @Override 196 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 197 synchronized (handlerThreads) { 198 handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread()); 199 } 200 countDownLatch.countDown(); 201 return null; 202 } 203 }; 204 for (CallRunner task : tasks) { 205 task.setStatus(new MonitoredRPCHandlerImpl("test")); 206 doAnswer(answerToRun).when(task).run(); 207 } 208 209 RpcScheduler scheduler = 210 new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS); 211 scheduler.init(CONTEXT); 212 scheduler.start(); 213 for (CallRunner task : tasks) { 214 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 215 scheduler.dispatch(task); 216 } 217 for (CallRunner task : tasks) { 218 verify(task, timeout(10000)).run(); 219 } 220 scheduler.stop(); 221 222 // Tests that these requests are handled by three distinct threads. 223 countDownLatch.await(); 224 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 225 } 226 227 private CallRunner createMockTask(int priority) { 228 ServerCall call = mock(ServerCall.class); 229 CallRunner task = mock(CallRunner.class); 230 RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build(); 231 when(task.getRpcCall()).thenReturn(call); 232 when(call.getHeader()).thenReturn(header); 233 when(call.getReceiveTime()).thenReturn(EnvironmentEdgeManager.currentTime()); 234 return task; 235 } 236 237 @Test 238 public void testRpcScheduler() throws Exception { 239 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 240 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 241 } 242 243 @Test 244 public void testPluggableRpcQueue() throws Exception { 245 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 246 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 247 248 try { 249 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass"); 250 fail("Expected a PluggableRpcQueueNotFound for unloaded class"); 251 } catch (PluggableRpcQueueNotFound e) { 252 // expected 253 } catch (Exception e) { 254 fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e); 255 } 256 257 try { 258 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 259 "org.apache.hadoop.hbase.ipc.SimpleRpcServer"); 260 fail("Expected a PluggableRpcQueueNotFound for incompatible class"); 261 } catch (PluggableRpcQueueNotFound e) { 262 // expected 263 } catch (Exception e) { 264 fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e); 265 } 266 } 267 268 @Test 269 public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception { 270 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 271 Configuration schedConf = HBaseConfiguration.create(); 272 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 273 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 274 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 275 schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true); 276 277 PriorityFunction priority = mock(PriorityFunction.class); 278 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 279 SimpleRpcScheduler scheduler = 280 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 281 282 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 283 f.setAccessible(true); 284 assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor); 285 } 286 287 @Test 288 public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception { 289 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 290 Configuration schedConf = HBaseConfiguration.create(); 291 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 292 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 293 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 294 295 PriorityFunction priority = mock(PriorityFunction.class); 296 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 297 SimpleRpcScheduler scheduler = 298 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 299 300 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 301 f.setAccessible(true); 302 assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor); 303 } 304 305 @Test 306 public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { 307 308 Configuration schedConf = HBaseConfiguration.create(); 309 310 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); 311 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 312 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 313 RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 314 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 315 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 316 317 PriorityFunction priority = mock(PriorityFunction.class); 318 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 319 SimpleRpcScheduler scheduler = 320 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 321 try { 322 scheduler.start(); 323 324 CallRunner putCallTask = mock(CallRunner.class); 325 ServerCall putCall = mock(ServerCall.class); 326 putCall.param = 327 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 328 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 329 when(putCallTask.getRpcCall()).thenReturn(putCall); 330 when(putCall.getHeader()).thenReturn(putHead); 331 332 assertTrue(scheduler.dispatch(putCallTask)); 333 334 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4); 335 scheduler.onConfigurationChange(schedConf); 336 assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange()); 337 waitUntilQueueEmpty(scheduler); 338 } finally { 339 scheduler.stop(); 340 } 341 } 342 343 private void testRpcScheduler(final String queueType) throws Exception { 344 testRpcScheduler(queueType, null); 345 } 346 347 private void testRpcScheduler(final String queueType, final String pluggableQueueClass) 348 throws Exception { 349 Configuration schedConf = HBaseConfiguration.create(); 350 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 351 352 if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) { 353 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass); 354 } 355 356 PriorityFunction priority = mock(PriorityFunction.class); 357 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 358 359 RpcScheduler scheduler = 360 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 361 try { 362 scheduler.start(); 363 364 CallRunner smallCallTask = mock(CallRunner.class); 365 ServerCall smallCall = mock(ServerCall.class); 366 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 367 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 368 when(smallCall.getHeader()).thenReturn(smallHead); 369 370 CallRunner largeCallTask = mock(CallRunner.class); 371 ServerCall largeCall = mock(ServerCall.class); 372 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 373 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 374 when(largeCall.getHeader()).thenReturn(largeHead); 375 376 CallRunner hugeCallTask = mock(CallRunner.class); 377 ServerCall hugeCall = mock(ServerCall.class); 378 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 379 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 380 when(hugeCall.getHeader()).thenReturn(hugeHead); 381 382 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 383 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 384 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 385 386 final ArrayList<Integer> work = new ArrayList<>(); 387 doAnswerTaskExecution(smallCallTask, work, 10, 250); 388 doAnswerTaskExecution(largeCallTask, work, 50, 250); 389 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 390 391 scheduler.dispatch(smallCallTask); 392 scheduler.dispatch(smallCallTask); 393 scheduler.dispatch(smallCallTask); 394 scheduler.dispatch(hugeCallTask); 395 scheduler.dispatch(smallCallTask); 396 scheduler.dispatch(largeCallTask); 397 scheduler.dispatch(smallCallTask); 398 scheduler.dispatch(smallCallTask); 399 400 while (work.size() < 8) { 401 Thread.sleep(100); 402 } 403 404 int seqSum = 0; 405 int totalTime = 0; 406 for (int i = 0; i < work.size(); ++i) { 407 LOG.debug("Request i=" + i + " value=" + work.get(i)); 408 seqSum += work.get(i); 409 totalTime += seqSum; 410 } 411 LOG.debug("Total Time: " + totalTime); 412 413 // -> [small small small huge small large small small] 414 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 415 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 416 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 417 assertEquals(530, totalTime); 418 } else if ( 419 queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) 420 || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE) 421 ) { 422 assertEquals(930, totalTime); 423 } 424 } finally { 425 scheduler.stop(); 426 } 427 } 428 429 @Test 430 public void testScanQueueWithZeroScanRatio() throws Exception { 431 432 Configuration schedConf = HBaseConfiguration.create(); 433 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 434 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 435 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 436 437 PriorityFunction priority = mock(PriorityFunction.class); 438 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 439 440 RpcScheduler scheduler = 441 new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD); 442 assertNotEquals(null, scheduler); 443 } 444 445 @Test 446 public void testScanQueues() throws Exception { 447 Configuration schedConf = HBaseConfiguration.create(); 448 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 449 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 450 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 451 452 PriorityFunction priority = mock(PriorityFunction.class); 453 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 454 455 RpcScheduler scheduler = 456 new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); 457 try { 458 scheduler.start(); 459 460 CallRunner putCallTask = mock(CallRunner.class); 461 ServerCall putCall = mock(ServerCall.class); 462 putCall.param = 463 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 464 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 465 when(putCallTask.getRpcCall()).thenReturn(putCall); 466 when(putCall.getHeader()).thenReturn(putHead); 467 when(putCall.getParam()).thenReturn(putCall.param); 468 469 CallRunner getCallTask = mock(CallRunner.class); 470 ServerCall getCall = mock(ServerCall.class); 471 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 472 when(getCallTask.getRpcCall()).thenReturn(getCall); 473 when(getCall.getHeader()).thenReturn(getHead); 474 475 CallRunner scanCallTask = mock(CallRunner.class); 476 ServerCall scanCall = mock(ServerCall.class); 477 scanCall.param = ScanRequest.newBuilder().build(); 478 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 479 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 480 when(scanCall.getHeader()).thenReturn(scanHead); 481 when(scanCall.getParam()).thenReturn(scanCall.param); 482 483 CallRunner bulkLoadCallTask = mock(CallRunner.class); 484 ServerCall bulkLoadCall = mock(ServerCall.class); 485 bulkLoadCall.param = ScanRequest.newBuilder().build(); 486 RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build(); 487 when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall); 488 when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead); 489 when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param); 490 491 ArrayList<Integer> work = new ArrayList<>(); 492 doAnswerTaskExecution(putCallTask, work, 1, 1000); 493 doAnswerTaskExecution(getCallTask, work, 2, 1000); 494 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 495 doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000); 496 497 // There are 3 queues: [puts], [gets], [scans], [bulkload] 498 // so the calls will be interleaved 499 scheduler.dispatch(putCallTask); 500 scheduler.dispatch(putCallTask); 501 scheduler.dispatch(putCallTask); 502 scheduler.dispatch(getCallTask); 503 scheduler.dispatch(getCallTask); 504 scheduler.dispatch(getCallTask); 505 scheduler.dispatch(scanCallTask); 506 scheduler.dispatch(scanCallTask); 507 scheduler.dispatch(scanCallTask); 508 scheduler.dispatch(bulkLoadCallTask); 509 scheduler.dispatch(bulkLoadCallTask); 510 scheduler.dispatch(bulkLoadCallTask); 511 while (work.size() < 6) { 512 Thread.sleep(100); 513 } 514 515 for (int i = 0; i < work.size() - 2; i += 3) { 516 assertNotEquals(work.get(i + 0), work.get(i + 1)); 517 assertNotEquals(work.get(i + 0), work.get(i + 2)); 518 assertNotEquals(work.get(i + 1), work.get(i + 2)); 519 } 520 } finally { 521 scheduler.stop(); 522 } 523 } 524 525 private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results, 526 final int value, final int sleepInterval) { 527 callTask.setStatus(new MonitoredRPCHandlerImpl("test")); 528 doAnswer(new Answer<Object>() { 529 @Override 530 public Object answer(InvocationOnMock invocation) { 531 synchronized (results) { 532 results.add(value); 533 } 534 Threads.sleepWithoutInterrupt(sleepInterval); 535 return null; 536 } 537 }).when(callTask).run(); 538 } 539 540 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 541 throws InterruptedException { 542 while (scheduler.getGeneralQueueLength() > 0) { 543 Thread.sleep(100); 544 } 545 } 546 547 @Test 548 public void testSoftAndHardQueueLimits() throws Exception { 549 550 Configuration schedConf = HBaseConfiguration.create(); 551 552 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 553 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 554 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 555 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 556 557 PriorityFunction priority = mock(PriorityFunction.class); 558 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 559 SimpleRpcScheduler scheduler = 560 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 561 try { 562 scheduler.start(); 563 564 CallRunner putCallTask = mock(CallRunner.class); 565 ServerCall putCall = mock(ServerCall.class); 566 putCall.param = 567 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 568 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 569 when(putCallTask.getRpcCall()).thenReturn(putCall); 570 when(putCall.getHeader()).thenReturn(putHead); 571 572 assertTrue(scheduler.dispatch(putCallTask)); 573 574 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 575 scheduler.onConfigurationChange(schedConf); 576 assertFalse(scheduler.dispatch(putCallTask)); 577 waitUntilQueueEmpty(scheduler); 578 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 579 scheduler.onConfigurationChange(schedConf); 580 assertTrue(scheduler.dispatch(putCallTask)); 581 } finally { 582 scheduler.stop(); 583 } 584 } 585 586 private final class CoDelEnvironmentEdge implements EnvironmentEdge { 587 588 private long perRequestOffset; 589 private long gapInRequests; 590 private long testStartTime = 0; 591 592 private int requestCount = 0; 593 594 private final Set<String> threadNamePrefixes = new HashSet<>(); 595 private final String testThread = Thread.currentThread().getName(); 596 597 @Override 598 public long currentTime() { 599 String threadName = Thread.currentThread().getName(); 600 if (threadName.equals(testThread)) { 601 // test thread will create a request and add that to scheduler 602 // Replicating a constant rate of request arrival. 603 long requestStartTime = testStartTime + requestCount * gapInRequests; 604 requestCount++; 605 return requestStartTime; 606 } 607 // handler thread will pick the request from queue, this time will depend on processing time 608 // of handler 609 for (String threadNamePrefix : threadNamePrefixes) { 610 if (threadName.startsWith(threadNamePrefix)) { 611 long requestPickTime; 612 if (gapInRequests >= perRequestOffset) { 613 // it means handler can complete the processing of last request and will be free when it 614 // will come to serve next request. We don't need it in fast path but just in case for 615 // other tests 616 requestPickTime = testStartTime + requestProcessed.get() * gapInRequests; 617 } else { 618 // this means handler will still be busy processing the last requests when new request 619 // will come in queue 620 requestPickTime = testStartTime + requestProcessed.get() * perRequestOffset; 621 } 622 return requestPickTime; 623 } 624 } 625 return System.currentTimeMillis(); 626 } 627 628 public void startTestFor(int arrivalRatePerSecond, long requestProcessingTime) { 629 this.perRequestOffset = requestProcessingTime; 630 this.gapInRequests = 1000L / arrivalRatePerSecond; 631 this.requestCount = 0; 632 requestProcessed.set(0); 633 this.testStartTime = System.currentTimeMillis(); 634 } 635 } 636 637 // This test is fixing the processing time and arrival rate of the request and checking if CoDel 638 // can utilize the maximum capacity of system 639 @Test 640 public void testCoDelScheduling() throws Exception { 641 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 642 envEdge.threadNamePrefixes.add("RpcServer.default.FPBQ.Codel.handler"); 643 Configuration schedConf = HBaseConfiguration.create(); 644 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 645 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 646 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 647 schedConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 5); 648 PriorityFunction priority = mock(PriorityFunction.class); 649 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 650 SimpleRpcScheduler scheduler = 651 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 652 653 try { 654 scheduler.start(); 655 EnvironmentEdgeManager.injectEdge(envEdge); 656 657 // incoming traffic < capacity 658 envEdge.startTestFor(20, 40); 659 for (int i = 0; i < 100; i++) { 660 long time = EnvironmentEdgeManager.currentTime(); 661 CallRunner cr = getMockedCallRunner(time, 2); 662 scheduler.dispatch(cr); 663 Thread.sleep(50); 664 } 665 // LOG.info("Loop done"); 666 // make sure fast calls are handled 667 waitUntilQueueEmpty(scheduler); 668 Thread.sleep(100); 669 assertEquals(0, scheduler.getNumGeneralCallsDropped(), 670 "None of these calls should have been discarded"); 671 672 // incoming traffic = capacity 673 envEdge.startTestFor(20, 50); 674 for (int i = 0; i < 20; i++) { 675 long time = EnvironmentEdgeManager.currentTime(); 676 CallRunner cr = getMockedCallRunner(time, 2); 677 scheduler.dispatch(cr); 678 // We have mocked the arrival time and the time request get picked from queue but we need to 679 // also make sure that queue fill at the arrival rate 680 Thread.sleep(50); 681 } 682 683 // make sure somewhat slow calls are handled 684 waitUntilQueueEmpty(scheduler); 685 Thread.sleep(100); 686 assertEquals(0, scheduler.getNumGeneralCallsDropped(), 687 "None of these calls should have been discarded"); 688 689 // incoming traffic > capacity 690 envEdge.startTestFor(20, 60); 691 // now slow calls and the ones to be dropped 692 for (int i = 0; i < 60; i++) { 693 long time = EnvironmentEdgeManager.currentTime(); 694 CallRunner cr = getMockedCallRunner(time, 100); 695 scheduler.dispatch(cr); 696 Thread.sleep(50); 697 } 698 699 // make sure somewhat slow calls are handled 700 waitUntilQueueEmpty(scheduler); 701 Thread.sleep(100); 702 // 60 calls will take 3 second with 20 rps. And in 3 second with 60 processing time we can 703 // only serve 50 request 704 assertTrue(requestProcessed.get() >= 48, 705 "Number of processed requests should be greater then 90% of capacity" 706 + requestProcessed.get()); 707 assertTrue(scheduler.getNumGeneralCallsDropped() >= 9, 708 "There should have been at least 9 calls dropped however there were " 709 + scheduler.getNumGeneralCallsDropped()); 710 711 } finally { 712 scheduler.stop(); 713 } 714 } 715 716 @Test 717 public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { 718 String name = testMethodName; 719 int handlerCount = 1; 720 String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; 721 int maxQueueLength = 0; 722 PriorityFunction priority = mock(PriorityFunction.class); 723 Configuration conf = HBaseConfiguration.create(); 724 Abortable abortable = mock(Abortable.class); 725 FastPathBalancedQueueRpcExecutor executor = 726 Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, 727 maxQueueLength, priority, conf, abortable)); 728 CallRunner task = mock(CallRunner.class); 729 assertFalse(executor.dispatch(task)); 730 // make sure we never internally get a handler, which would skip the queue validation 731 Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), 732 Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 733 } 734 735 @Test 736 public void testMetaRWScanQueues() throws Exception { 737 Configuration schedConf = HBaseConfiguration.create(); 738 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 739 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 740 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 741 742 PriorityFunction priority = mock(PriorityFunction.class); 743 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 744 745 RpcScheduler scheduler = 746 new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD); 747 try { 748 scheduler.start(); 749 750 CallRunner putCallTask = mock(CallRunner.class); 751 ServerCall putCall = mock(ServerCall.class); 752 putCall.param = 753 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 754 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 755 when(putCallTask.getRpcCall()).thenReturn(putCall); 756 when(putCall.getHeader()).thenReturn(putHead); 757 when(putCall.getParam()).thenReturn(putCall.param); 758 759 CallRunner clientReadCallTask = mock(CallRunner.class); 760 ServerCall clientReadCall = mock(ServerCall.class); 761 RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build(); 762 when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall); 763 when(clientReadCall.getHeader()).thenReturn(clientReadHead); 764 765 CallRunner internalReadCallTask = mock(CallRunner.class); 766 ServerCall internalReadCall = mock(ServerCall.class); 767 internalReadCall.param = ScanRequest.newBuilder().build(); 768 RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan") 769 .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build(); 770 when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall); 771 when(internalReadCall.getHeader()).thenReturn(masterReadHead); 772 when(internalReadCall.getParam()).thenReturn(internalReadCall.param); 773 774 ArrayList<Integer> work = new ArrayList<>(); 775 doAnswerTaskExecution(putCallTask, work, 1, 1000); 776 doAnswerTaskExecution(clientReadCallTask, work, 2, 1000); 777 doAnswerTaskExecution(internalReadCallTask, work, 3, 1000); 778 779 // There are 3 queues: [puts], [gets], [scans] 780 // so the calls will be interleaved 781 scheduler.dispatch(putCallTask); 782 scheduler.dispatch(putCallTask); 783 scheduler.dispatch(putCallTask); 784 scheduler.dispatch(clientReadCallTask); 785 scheduler.dispatch(clientReadCallTask); 786 scheduler.dispatch(clientReadCallTask); 787 scheduler.dispatch(internalReadCallTask); 788 scheduler.dispatch(internalReadCallTask); 789 scheduler.dispatch(internalReadCallTask); 790 791 while (work.size() < 6) { 792 Thread.sleep(100); 793 } 794 795 for (int i = 0; i < work.size() - 2; i += 3) { 796 assertNotEquals(work.get(i + 0), work.get(i + 1)); 797 assertNotEquals(work.get(i + 0), work.get(i + 2)); 798 assertNotEquals(work.get(i + 1), work.get(i + 2)); 799 } 800 } finally { 801 scheduler.stop(); 802 } 803 } 804 805 // Get mocked call that has the CallRunner sleep for a while so that the fast 806 // path isn't hit. 807 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 808 ServerCall putCall = new ServerCall(1, null, null, 809 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 810 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 811 null, null, 9, null, timestamp, 0, null, null, null) { 812 813 @Override 814 public void sendResponseIfReady() throws IOException { 815 } 816 }; 817 818 return new CallRunner(null, putCall) { 819 @Override 820 public void run() { 821 if (sleepTime <= 0) { 822 return; 823 } 824 try { 825 Thread.sleep(sleepTime); 826 requestProcessed.incrementAndGet(); 827 } catch (InterruptedException e) { 828 } 829 } 830 831 @Override 832 public RpcCall getRpcCall() { 833 return putCall; 834 } 835 836 @Override 837 public void drop() { 838 } 839 }; 840 } 841 842 /** 843 * Test LIFO switching behavior through actual RPC calls. This test verifies that when the queue 844 * fills beyond the LIFO threshold, newer calls are processed before older calls (LIFO mode). 845 */ 846 @Test 847 public void testCoDelLifoWithRpcCalls() throws Exception { 848 Configuration testConf = HBaseConfiguration.create(); 849 testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 850 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 851 int maxCallQueueLength = 50; 852 double coDelLifoThreshold = 0.8; 853 testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, maxCallQueueLength); 854 testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, coDelLifoThreshold); 855 testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 100); 856 testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_INTERVAL, 100); 857 testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); // Single handler to control 858 // processing 859 860 PriorityFunction priority = mock(PriorityFunction.class); 861 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 862 SimpleRpcScheduler scheduler = 863 new SimpleRpcScheduler(testConf, 1, 0, 0, priority, HConstants.QOS_THRESHOLD); 864 865 try { 866 scheduler.init(CONTEXT); 867 scheduler.start(); 868 869 // Track completion order 870 final List<Integer> completedCalls = Collections.synchronizedList(new ArrayList<>()); 871 872 // Dispatch many slow calls rapidly to fill the queue beyond 80% threshold 873 // With queue limit of 50, we need > 40 calls to cross 80% 874 int numCalls = 48; 875 for (int i = 0; i < numCalls; i++) { 876 final int callId = i; 877 CallRunner call = createMockTask(HConstants.NORMAL_QOS); 878 call.setStatus(new MonitoredRPCHandlerImpl("test")); 879 doAnswer(invocation -> { 880 completedCalls.add(callId); 881 Thread.sleep(100); // Slow processing to allow queue to build up 882 return null; 883 }).when(call).run(); 884 scheduler.dispatch(call); 885 // No delay between dispatches - rapidly fill the queue 886 } 887 888 // Wait for some calls to complete 889 await().atMost(2, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 3); 890 891 // Check that we had LIFO switches 892 long lifoSwitches = scheduler.getNumLifoModeSwitches(); 893 assertTrue(lifoSwitches > 0, 894 "Should have switched to LIFO mode at least once, but got: " + lifoSwitches); 895 896 // Verify LIFO behavior: Among first completed calls, we should see higher call IDs 897 // (indicating later dispatched calls completed first) 898 int maxCallIdCompleted = -1; 899 for (int i = 0; i < completedCalls.size(); i++) { 900 maxCallIdCompleted = Math.max(maxCallIdCompleted, completedCalls.get(i)); 901 } 902 // At least one of the early completed calls should have a high ID (>20) 903 // indicating LIFO processing 904 assertTrue(maxCallIdCompleted > maxCallQueueLength * coDelLifoThreshold, 905 "Expected LIFO behavior: early completed calls should include call arrived after threshold " 906 + "maxCallIdCompleted: " + maxCallIdCompleted); 907 908 } finally { 909 scheduler.stop(); 910 } 911 } 912 913 /** 914 * Test that CoDel queue returns to FIFO mode after draining below threshold. 915 */ 916 @Test 917 public void testCoDelQueueDrainAndFifoReturn() throws Exception { 918 Configuration testConf = HBaseConfiguration.create(); 919 testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 920 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 921 testConf.setLong(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 100); 922 testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 50); 923 testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8); 924 testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); 925 926 PriorityFunction priority = mock(PriorityFunction.class); 927 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 928 SimpleRpcScheduler scheduler = 929 new SimpleRpcScheduler(testConf, 2, 0, 0, priority, HConstants.QOS_THRESHOLD); 930 931 try { 932 scheduler.init(CONTEXT); 933 scheduler.start(); 934 935 final List<Integer> completedCalls = Collections.synchronizedList(new ArrayList<>()); 936 937 // Fill queue rapidly to trigger LIFO (>40 calls for 80% of 50) 938 for (int i = 0; i < 48; i++) { 939 final int callId = i; 940 CallRunner call = createMockTask(HConstants.NORMAL_QOS); 941 call.setStatus(new MonitoredRPCHandlerImpl("test")); 942 doAnswer(invocation -> { 943 completedCalls.add(callId); 944 Thread.sleep(80); 945 return null; 946 }).when(call).run(); 947 scheduler.dispatch(call); 948 } 949 950 // Wait for calls to complete 951 await().atMost(1, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 3); 952 assertTrue(scheduler.getNumLifoModeSwitches() > 0, "Should have entered LIFO mode"); 953 954 await().atMost(2, TimeUnit.SECONDS).until( 955 () -> scheduler.getGeneralQueueLength() == 0 && scheduler.getActiveRpcHandlerCount() == 0); 956 957 long pastNumLifoModeSwitches = scheduler.getNumLifoModeSwitches(); 958 // Send new calls - should process in FIFO order 959 completedCalls.clear(); 960 for (int i = 100; i < 105; i++) { 961 final int callId = i; 962 CallRunner call = createMockTask(HConstants.NORMAL_QOS); 963 call.setStatus(new MonitoredRPCHandlerImpl("test")); 964 doAnswer(invocation -> { 965 completedCalls.add(callId); 966 Thread.sleep(50); 967 return null; 968 }).when(call).run(); 969 scheduler.dispatch(call); 970 } 971 972 // Wait for these calls to complete 973 await().atMost(2, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2); 974 975 long newLifoSwitch = scheduler.getNumLifoModeSwitches() - pastNumLifoModeSwitches; 976 // Allow at most 1 violation due to concurrent execution by 2 handlers 977 assertEquals(0, newLifoSwitch, 978 "Queue should not switch to LIFO last 5 calls but number of LIFO switch are : " 979 + newLifoSwitch); 980 } finally { 981 scheduler.stop(); 982 } 983 } 984 985}