001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.timeout; 030import static org.mockito.Mockito.verify; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.lang.reflect.Field; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.BlockingQueue; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.LinkedBlockingQueue; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.Abortable; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 050import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RPCTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdge; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.Threads; 057import org.junit.jupiter.api.BeforeEach; 058import org.junit.jupiter.api.Tag; 059import org.junit.jupiter.api.Test; 060import org.junit.jupiter.api.TestInfo; 061import org.mockito.Mockito; 062import org.mockito.invocation.InvocationOnMock; 063import org.mockito.stubbing.Answer; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 070import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 071 072import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 076 077@Tag(RPCTests.TAG) 078@Tag(MediumTests.TAG) 079public class TestSimpleRpcScheduler { 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 082 083 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 084 @Override 085 public InetSocketAddress getListenerAddress() { 086 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 087 } 088 }; 089 private Configuration conf; 090 private String testMethodName; 091 092 @BeforeEach 093 public void setUp(TestInfo testInfo) { 094 testMethodName = testInfo.getTestMethod().get().getName(); 095 conf = HBaseConfiguration.create(); 096 } 097 098 @Test 099 public void testBasic() throws IOException, InterruptedException { 100 101 PriorityFunction qosFunction = mock(PriorityFunction.class); 102 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0); 103 scheduler.init(CONTEXT); 104 scheduler.start(); 105 CallRunner task = createMockTask(HConstants.NORMAL_QOS); 106 task.setStatus(new MonitoredRPCHandlerImpl("test")); 107 scheduler.dispatch(task); 108 verify(task, timeout(10000)).run(); 109 scheduler.stop(); 110 } 111 112 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 113 try { 114 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 115 ExecutorField.setAccessible(true); 116 117 RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler); 118 119 Field handlerCountField = 120 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount"); 121 122 handlerCountField.setAccessible(true); 123 handlerCountField.set(rpcExecutor, 0); 124 125 Field numCallQueuesField = 126 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues"); 127 128 numCallQueuesField.setAccessible(true); 129 numCallQueuesField.set(rpcExecutor, 1); 130 131 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass() 132 .getDeclaredField("currentQueueLimit"); 133 134 currentQueueLimitField.setAccessible(true); 135 currentQueueLimitField.set(rpcExecutor, 100); 136 137 } catch (NoSuchFieldException e) { 138 LOG.error("No such field exception" + e); 139 } catch (IllegalAccessException e) { 140 LOG.error("Illegal access exception" + e); 141 } 142 143 return scheduler; 144 } 145 146 @Test 147 public void testCallQueueInfo() throws IOException, InterruptedException { 148 149 PriorityFunction qosFunction = mock(PriorityFunction.class); 150 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0); 151 152 scheduler.init(CONTEXT); 153 154 // Set the handlers to zero. So that number of requests in call Queue can be tested 155 scheduler = disableHandlers(scheduler); 156 scheduler.start(); 157 158 int totalCallMethods = 10; 159 for (int i = totalCallMethods; i > 0; i--) { 160 CallRunner task = createMockTask(HConstants.NORMAL_QOS); 161 task.setStatus(new MonitoredRPCHandlerImpl("test")); 162 scheduler.dispatch(task); 163 } 164 165 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 166 167 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 168 169 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 170 assertEquals(totalCallMethods, 171 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 172 } 173 174 } 175 176 scheduler.stop(); 177 178 } 179 180 @Test 181 public void testHandlerIsolation() throws IOException, InterruptedException { 182 CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS); 183 CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1); 184 CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS); 185 List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask); 186 Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask, 187 HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS); 188 PriorityFunction qosFunction = mock(PriorityFunction.class); 189 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 190 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 191 Answer<Void> answerToRun = new Answer<Void>() { 192 @Override 193 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 194 synchronized (handlerThreads) { 195 handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread()); 196 } 197 countDownLatch.countDown(); 198 return null; 199 } 200 }; 201 for (CallRunner task : tasks) { 202 task.setStatus(new MonitoredRPCHandlerImpl("test")); 203 doAnswer(answerToRun).when(task).run(); 204 } 205 206 RpcScheduler scheduler = 207 new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS); 208 scheduler.init(CONTEXT); 209 scheduler.start(); 210 for (CallRunner task : tasks) { 211 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 212 scheduler.dispatch(task); 213 } 214 for (CallRunner task : tasks) { 215 verify(task, timeout(10000)).run(); 216 } 217 scheduler.stop(); 218 219 // Tests that these requests are handled by three distinct threads. 220 countDownLatch.await(); 221 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 222 } 223 224 private CallRunner createMockTask(int priority) { 225 ServerCall call = mock(ServerCall.class); 226 CallRunner task = mock(CallRunner.class); 227 RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build(); 228 when(task.getRpcCall()).thenReturn(call); 229 when(call.getHeader()).thenReturn(header); 230 return task; 231 } 232 233 @Test 234 public void testRpcScheduler() throws Exception { 235 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 236 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 237 } 238 239 @Test 240 public void testPluggableRpcQueue() throws Exception { 241 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 242 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 243 244 try { 245 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass"); 246 fail("Expected a PluggableRpcQueueNotFound for unloaded class"); 247 } catch (PluggableRpcQueueNotFound e) { 248 // expected 249 } catch (Exception e) { 250 fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e); 251 } 252 253 try { 254 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 255 "org.apache.hadoop.hbase.ipc.SimpleRpcServer"); 256 fail("Expected a PluggableRpcQueueNotFound for incompatible class"); 257 } catch (PluggableRpcQueueNotFound e) { 258 // expected 259 } catch (Exception e) { 260 fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e); 261 } 262 } 263 264 @Test 265 public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception { 266 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 267 Configuration schedConf = HBaseConfiguration.create(); 268 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 269 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 270 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 271 schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true); 272 273 PriorityFunction priority = mock(PriorityFunction.class); 274 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 275 SimpleRpcScheduler scheduler = 276 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 277 278 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 279 f.setAccessible(true); 280 assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor); 281 } 282 283 @Test 284 public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception { 285 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 286 Configuration schedConf = HBaseConfiguration.create(); 287 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 288 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 289 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 290 291 PriorityFunction priority = mock(PriorityFunction.class); 292 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 293 SimpleRpcScheduler scheduler = 294 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 295 296 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 297 f.setAccessible(true); 298 assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor); 299 } 300 301 @Test 302 public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { 303 304 Configuration schedConf = HBaseConfiguration.create(); 305 306 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); 307 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 308 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 309 RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 310 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 311 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 312 313 PriorityFunction priority = mock(PriorityFunction.class); 314 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 315 SimpleRpcScheduler scheduler = 316 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 317 try { 318 scheduler.start(); 319 320 CallRunner putCallTask = mock(CallRunner.class); 321 ServerCall putCall = mock(ServerCall.class); 322 putCall.param = 323 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 324 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 325 when(putCallTask.getRpcCall()).thenReturn(putCall); 326 when(putCall.getHeader()).thenReturn(putHead); 327 328 assertTrue(scheduler.dispatch(putCallTask)); 329 330 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4); 331 scheduler.onConfigurationChange(schedConf); 332 assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange()); 333 waitUntilQueueEmpty(scheduler); 334 } finally { 335 scheduler.stop(); 336 } 337 } 338 339 private void testRpcScheduler(final String queueType) throws Exception { 340 testRpcScheduler(queueType, null); 341 } 342 343 private void testRpcScheduler(final String queueType, final String pluggableQueueClass) 344 throws Exception { 345 Configuration schedConf = HBaseConfiguration.create(); 346 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 347 348 if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) { 349 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass); 350 } 351 352 PriorityFunction priority = mock(PriorityFunction.class); 353 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 354 355 RpcScheduler scheduler = 356 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 357 try { 358 scheduler.start(); 359 360 CallRunner smallCallTask = mock(CallRunner.class); 361 ServerCall smallCall = mock(ServerCall.class); 362 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 363 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 364 when(smallCall.getHeader()).thenReturn(smallHead); 365 366 CallRunner largeCallTask = mock(CallRunner.class); 367 ServerCall largeCall = mock(ServerCall.class); 368 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 369 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 370 when(largeCall.getHeader()).thenReturn(largeHead); 371 372 CallRunner hugeCallTask = mock(CallRunner.class); 373 ServerCall hugeCall = mock(ServerCall.class); 374 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 375 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 376 when(hugeCall.getHeader()).thenReturn(hugeHead); 377 378 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 379 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 380 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 381 382 final ArrayList<Integer> work = new ArrayList<>(); 383 doAnswerTaskExecution(smallCallTask, work, 10, 250); 384 doAnswerTaskExecution(largeCallTask, work, 50, 250); 385 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 386 387 scheduler.dispatch(smallCallTask); 388 scheduler.dispatch(smallCallTask); 389 scheduler.dispatch(smallCallTask); 390 scheduler.dispatch(hugeCallTask); 391 scheduler.dispatch(smallCallTask); 392 scheduler.dispatch(largeCallTask); 393 scheduler.dispatch(smallCallTask); 394 scheduler.dispatch(smallCallTask); 395 396 while (work.size() < 8) { 397 Thread.sleep(100); 398 } 399 400 int seqSum = 0; 401 int totalTime = 0; 402 for (int i = 0; i < work.size(); ++i) { 403 LOG.debug("Request i=" + i + " value=" + work.get(i)); 404 seqSum += work.get(i); 405 totalTime += seqSum; 406 } 407 LOG.debug("Total Time: " + totalTime); 408 409 // -> [small small small huge small large small small] 410 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 411 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 412 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 413 assertEquals(530, totalTime); 414 } else if ( 415 queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) 416 || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE) 417 ) { 418 assertEquals(930, totalTime); 419 } 420 } finally { 421 scheduler.stop(); 422 } 423 } 424 425 @Test 426 public void testScanQueueWithZeroScanRatio() throws Exception { 427 428 Configuration schedConf = HBaseConfiguration.create(); 429 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 430 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 431 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 432 433 PriorityFunction priority = mock(PriorityFunction.class); 434 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 435 436 RpcScheduler scheduler = 437 new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD); 438 assertNotEquals(null, scheduler); 439 } 440 441 @Test 442 public void testScanQueues() throws Exception { 443 Configuration schedConf = HBaseConfiguration.create(); 444 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 445 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 446 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 447 448 PriorityFunction priority = mock(PriorityFunction.class); 449 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 450 451 RpcScheduler scheduler = 452 new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); 453 try { 454 scheduler.start(); 455 456 CallRunner putCallTask = mock(CallRunner.class); 457 ServerCall putCall = mock(ServerCall.class); 458 putCall.param = 459 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 460 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 461 when(putCallTask.getRpcCall()).thenReturn(putCall); 462 when(putCall.getHeader()).thenReturn(putHead); 463 when(putCall.getParam()).thenReturn(putCall.param); 464 465 CallRunner getCallTask = mock(CallRunner.class); 466 ServerCall getCall = mock(ServerCall.class); 467 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 468 when(getCallTask.getRpcCall()).thenReturn(getCall); 469 when(getCall.getHeader()).thenReturn(getHead); 470 471 CallRunner scanCallTask = mock(CallRunner.class); 472 ServerCall scanCall = mock(ServerCall.class); 473 scanCall.param = ScanRequest.newBuilder().build(); 474 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 475 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 476 when(scanCall.getHeader()).thenReturn(scanHead); 477 when(scanCall.getParam()).thenReturn(scanCall.param); 478 479 CallRunner bulkLoadCallTask = mock(CallRunner.class); 480 ServerCall bulkLoadCall = mock(ServerCall.class); 481 bulkLoadCall.param = ScanRequest.newBuilder().build(); 482 RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build(); 483 when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall); 484 when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead); 485 when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param); 486 487 ArrayList<Integer> work = new ArrayList<>(); 488 doAnswerTaskExecution(putCallTask, work, 1, 1000); 489 doAnswerTaskExecution(getCallTask, work, 2, 1000); 490 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 491 doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000); 492 493 // There are 3 queues: [puts], [gets], [scans], [bulkload] 494 // so the calls will be interleaved 495 scheduler.dispatch(putCallTask); 496 scheduler.dispatch(putCallTask); 497 scheduler.dispatch(putCallTask); 498 scheduler.dispatch(getCallTask); 499 scheduler.dispatch(getCallTask); 500 scheduler.dispatch(getCallTask); 501 scheduler.dispatch(scanCallTask); 502 scheduler.dispatch(scanCallTask); 503 scheduler.dispatch(scanCallTask); 504 scheduler.dispatch(bulkLoadCallTask); 505 scheduler.dispatch(bulkLoadCallTask); 506 scheduler.dispatch(bulkLoadCallTask); 507 while (work.size() < 6) { 508 Thread.sleep(100); 509 } 510 511 for (int i = 0; i < work.size() - 2; i += 3) { 512 assertNotEquals(work.get(i + 0), work.get(i + 1)); 513 assertNotEquals(work.get(i + 0), work.get(i + 2)); 514 assertNotEquals(work.get(i + 1), work.get(i + 2)); 515 } 516 } finally { 517 scheduler.stop(); 518 } 519 } 520 521 private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results, 522 final int value, final int sleepInterval) { 523 callTask.setStatus(new MonitoredRPCHandlerImpl("test")); 524 doAnswer(new Answer<Object>() { 525 @Override 526 public Object answer(InvocationOnMock invocation) { 527 synchronized (results) { 528 results.add(value); 529 } 530 Threads.sleepWithoutInterrupt(sleepInterval); 531 return null; 532 } 533 }).when(callTask).run(); 534 } 535 536 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 537 throws InterruptedException { 538 while (scheduler.getGeneralQueueLength() > 0) { 539 Thread.sleep(100); 540 } 541 } 542 543 @Test 544 public void testSoftAndHardQueueLimits() throws Exception { 545 546 Configuration schedConf = HBaseConfiguration.create(); 547 548 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 549 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 550 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 551 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 552 553 PriorityFunction priority = mock(PriorityFunction.class); 554 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 555 SimpleRpcScheduler scheduler = 556 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 557 try { 558 scheduler.start(); 559 560 CallRunner putCallTask = mock(CallRunner.class); 561 ServerCall putCall = mock(ServerCall.class); 562 putCall.param = 563 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 564 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 565 when(putCallTask.getRpcCall()).thenReturn(putCall); 566 when(putCall.getHeader()).thenReturn(putHead); 567 568 assertTrue(scheduler.dispatch(putCallTask)); 569 570 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 571 scheduler.onConfigurationChange(schedConf); 572 assertFalse(scheduler.dispatch(putCallTask)); 573 waitUntilQueueEmpty(scheduler); 574 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 575 scheduler.onConfigurationChange(schedConf); 576 assertTrue(scheduler.dispatch(putCallTask)); 577 } finally { 578 scheduler.stop(); 579 } 580 } 581 582 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 583 584 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 585 586 private long offset; 587 588 private final Set<String> threadNamePrefixs = new HashSet<>(); 589 590 @Override 591 public long currentTime() { 592 for (String threadNamePrefix : threadNamePrefixs) { 593 String threadName = Thread.currentThread().getName(); 594 if (threadName.startsWith(threadNamePrefix)) { 595 if (timeQ != null) { 596 Long qTime = timeQ.poll(); 597 if (qTime != null) { 598 return qTime.longValue() + offset; 599 } 600 } 601 } 602 } 603 return System.currentTimeMillis(); 604 } 605 } 606 607 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay 608 // from the get go. So we are always overloaded. The test below would seem to complete the 609 // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping 610 // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for 611 // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e. 612 // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath 613 // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, 614 // numCallQueues... Codel is hard to test. This test is going to be flakey given it all 615 // timer-based. Disabling for now till chat with authors. 616 @Test 617 public void testCoDelScheduling() throws Exception { 618 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 619 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 620 Configuration schedConf = HBaseConfiguration.create(); 621 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 622 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 623 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 624 PriorityFunction priority = mock(PriorityFunction.class); 625 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 626 SimpleRpcScheduler scheduler = 627 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 628 try { 629 // Loading mocked call runner can take a good amount of time the first time through 630 // (haven't looked why). Load it for first time here outside of the timed loop. 631 getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2); 632 scheduler.start(); 633 EnvironmentEdgeManager.injectEdge(envEdge); 634 envEdge.offset = 5; 635 // Calls faster than min delay 636 // LOG.info("Start"); 637 for (int i = 0; i < 100; i++) { 638 long time = EnvironmentEdgeManager.currentTime(); 639 envEdge.timeQ.put(time); 640 CallRunner cr = getMockedCallRunner(time, 2); 641 scheduler.dispatch(cr); 642 } 643 // LOG.info("Loop done"); 644 // make sure fast calls are handled 645 waitUntilQueueEmpty(scheduler); 646 Thread.sleep(100); 647 assertEquals(0, scheduler.getNumGeneralCallsDropped(), 648 "None of these calls should have been discarded"); 649 650 envEdge.offset = 151; 651 // calls slower than min delay, but not individually slow enough to be dropped 652 for (int i = 0; i < 20; i++) { 653 long time = EnvironmentEdgeManager.currentTime(); 654 envEdge.timeQ.put(time); 655 CallRunner cr = getMockedCallRunner(time, 2); 656 scheduler.dispatch(cr); 657 } 658 659 // make sure somewhat slow calls are handled 660 waitUntilQueueEmpty(scheduler); 661 Thread.sleep(100); 662 assertEquals(0, scheduler.getNumGeneralCallsDropped(), 663 "None of these calls should have been discarded"); 664 665 envEdge.offset = 2000; 666 // now slow calls and the ones to be dropped 667 for (int i = 0; i < 60; i++) { 668 long time = EnvironmentEdgeManager.currentTime(); 669 envEdge.timeQ.put(time); 670 CallRunner cr = getMockedCallRunner(time, 100); 671 scheduler.dispatch(cr); 672 } 673 674 // make sure somewhat slow calls are handled 675 waitUntilQueueEmpty(scheduler); 676 Thread.sleep(100); 677 assertTrue(scheduler.getNumGeneralCallsDropped() > 12, 678 "There should have been at least 12 calls dropped however there were " 679 + scheduler.getNumGeneralCallsDropped()); 680 } finally { 681 scheduler.stop(); 682 } 683 } 684 685 @Test 686 public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { 687 String name = testMethodName; 688 int handlerCount = 1; 689 String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; 690 int maxQueueLength = 0; 691 PriorityFunction priority = mock(PriorityFunction.class); 692 Configuration conf = HBaseConfiguration.create(); 693 Abortable abortable = mock(Abortable.class); 694 FastPathBalancedQueueRpcExecutor executor = 695 Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, 696 maxQueueLength, priority, conf, abortable)); 697 CallRunner task = mock(CallRunner.class); 698 assertFalse(executor.dispatch(task)); 699 // make sure we never internally get a handler, which would skip the queue validation 700 Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), 701 Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 702 } 703 704 @Test 705 public void testMetaRWScanQueues() throws Exception { 706 Configuration schedConf = HBaseConfiguration.create(); 707 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 708 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 709 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 710 711 PriorityFunction priority = mock(PriorityFunction.class); 712 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 713 714 RpcScheduler scheduler = 715 new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD); 716 try { 717 scheduler.start(); 718 719 CallRunner putCallTask = mock(CallRunner.class); 720 ServerCall putCall = mock(ServerCall.class); 721 putCall.param = 722 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 723 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 724 when(putCallTask.getRpcCall()).thenReturn(putCall); 725 when(putCall.getHeader()).thenReturn(putHead); 726 when(putCall.getParam()).thenReturn(putCall.param); 727 728 CallRunner clientReadCallTask = mock(CallRunner.class); 729 ServerCall clientReadCall = mock(ServerCall.class); 730 RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build(); 731 when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall); 732 when(clientReadCall.getHeader()).thenReturn(clientReadHead); 733 734 CallRunner internalReadCallTask = mock(CallRunner.class); 735 ServerCall internalReadCall = mock(ServerCall.class); 736 internalReadCall.param = ScanRequest.newBuilder().build(); 737 RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan") 738 .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build(); 739 when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall); 740 when(internalReadCall.getHeader()).thenReturn(masterReadHead); 741 when(internalReadCall.getParam()).thenReturn(internalReadCall.param); 742 743 ArrayList<Integer> work = new ArrayList<>(); 744 doAnswerTaskExecution(putCallTask, work, 1, 1000); 745 doAnswerTaskExecution(clientReadCallTask, work, 2, 1000); 746 doAnswerTaskExecution(internalReadCallTask, work, 3, 1000); 747 748 // There are 3 queues: [puts], [gets], [scans] 749 // so the calls will be interleaved 750 scheduler.dispatch(putCallTask); 751 scheduler.dispatch(putCallTask); 752 scheduler.dispatch(putCallTask); 753 scheduler.dispatch(clientReadCallTask); 754 scheduler.dispatch(clientReadCallTask); 755 scheduler.dispatch(clientReadCallTask); 756 scheduler.dispatch(internalReadCallTask); 757 scheduler.dispatch(internalReadCallTask); 758 scheduler.dispatch(internalReadCallTask); 759 760 while (work.size() < 6) { 761 Thread.sleep(100); 762 } 763 764 for (int i = 0; i < work.size() - 2; i += 3) { 765 assertNotEquals(work.get(i + 0), work.get(i + 1)); 766 assertNotEquals(work.get(i + 0), work.get(i + 2)); 767 assertNotEquals(work.get(i + 1), work.get(i + 2)); 768 } 769 } finally { 770 scheduler.stop(); 771 } 772 } 773 774 // Get mocked call that has the CallRunner sleep for a while so that the fast 775 // path isn't hit. 776 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 777 ServerCall putCall = new ServerCall(1, null, null, 778 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 779 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 780 null, null, 9, null, timestamp, 0, null, null, null) { 781 782 @Override 783 public void sendResponseIfReady() throws IOException { 784 } 785 }; 786 787 return new CallRunner(null, putCall) { 788 @Override 789 public void run() { 790 if (sleepTime <= 0) { 791 return; 792 } 793 try { 794 LOG.warn("Sleeping for " + sleepTime); 795 Thread.sleep(sleepTime); 796 LOG.warn("Done Sleeping for " + sleepTime); 797 } catch (InterruptedException e) { 798 } 799 } 800 801 @Override 802 public RpcCall getRpcCall() { 803 return putCall; 804 } 805 806 @Override 807 public void drop() { 808 } 809 }; 810 } 811}