001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.eq; 026import static org.mockito.Mockito.doAnswer; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.timeout; 029import static org.mockito.Mockito.verify; 030import static org.mockito.Mockito.when; 031import java.io.IOException; 032import java.lang.reflect.Field; 033import java.net.InetSocketAddress; 034import java.util.ArrayList; 035import java.util.HashSet; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.CountDownLatch; 041import java.util.concurrent.LinkedBlockingQueue; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.Abortable; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.testclassification.RPCTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdge; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.Threads; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.mockito.Mockito; 062import org.mockito.invocation.InvocationOnMock; 063import org.mockito.stubbing.Answer; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 069import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 074 075@Category({RPCTests.class, MediumTests.class}) 076public class TestSimpleRpcScheduler { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class); 081 082 @Rule 083 public TestName testName = new TestName(); 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 086 087 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 088 @Override 089 public InetSocketAddress getListenerAddress() { 090 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 091 } 092 }; 093 private Configuration conf; 094 095 @Before 096 public void setUp() { 097 conf = HBaseConfiguration.create(); 098 } 099 100 @Test 101 public void testBasic() throws IOException, InterruptedException { 102 103 PriorityFunction qosFunction = mock(PriorityFunction.class); 104 RpcScheduler scheduler = new SimpleRpcScheduler( 105 conf, 10, 0, 0, qosFunction, 0); 106 scheduler.init(CONTEXT); 107 scheduler.start(); 108 CallRunner task = createMockTask(); 109 task.setStatus(new MonitoredRPCHandlerImpl()); 110 scheduler.dispatch(task); 111 verify(task, timeout(10000)).run(); 112 scheduler.stop(); 113 } 114 115 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 116 try { 117 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 118 ExecutorField.setAccessible(true); 119 120 RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler); 121 122 Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass(). 123 getDeclaredField("handlerCount"); 124 125 handlerCountField.setAccessible(true); 126 handlerCountField.set(rpcExecutor, 0); 127 128 Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass(). 129 getDeclaredField("numCallQueues"); 130 131 numCallQueuesField.setAccessible(true); 132 numCallQueuesField.set(rpcExecutor, 1); 133 134 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass(). 135 getDeclaredField("currentQueueLimit"); 136 137 currentQueueLimitField.setAccessible(true); 138 currentQueueLimitField.set(rpcExecutor, 100); 139 140 } catch (NoSuchFieldException e) { 141 LOG.error("No such field exception"+e); 142 } catch (IllegalAccessException e) { 143 LOG.error("Illegal access exception"+e); 144 } 145 146 return scheduler; 147 } 148 149 @Test 150 public void testCallQueueInfo() throws IOException, InterruptedException { 151 152 PriorityFunction qosFunction = mock(PriorityFunction.class); 153 RpcScheduler scheduler = new SimpleRpcScheduler( 154 conf, 0, 0, 0, qosFunction, 0); 155 156 scheduler.init(CONTEXT); 157 158 // Set the handlers to zero. So that number of requests in call Queue can be tested 159 scheduler = disableHandlers(scheduler); 160 scheduler.start(); 161 162 int totalCallMethods = 10; 163 for (int i = totalCallMethods; i>0; i--) { 164 CallRunner task = createMockTask(); 165 task.setStatus(new MonitoredRPCHandlerImpl()); 166 scheduler.dispatch(task); 167 } 168 169 170 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 171 172 for (String callQueueName:callQueueInfo.getCallQueueNames()) { 173 174 for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) { 175 assertEquals(totalCallMethods, 176 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 177 } 178 179 } 180 181 scheduler.stop(); 182 183 } 184 185 186 @Test 187 public void testHandlerIsolation() throws IOException, InterruptedException { 188 CallRunner generalTask = createMockTask(); 189 CallRunner priorityTask = createMockTask(); 190 CallRunner replicationTask = createMockTask(); 191 List<CallRunner> tasks = ImmutableList.of( 192 generalTask, 193 priorityTask, 194 replicationTask); 195 Map<CallRunner, Integer> qos = ImmutableMap.of( 196 generalTask, 0, 197 priorityTask, HConstants.HIGH_QOS + 1, 198 replicationTask, HConstants.REPLICATION_QOS); 199 PriorityFunction qosFunction = mock(PriorityFunction.class); 200 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 201 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 202 Answer<Void> answerToRun = new Answer<Void>() { 203 @Override 204 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 205 synchronized (handlerThreads) { 206 handlerThreads.put( 207 (CallRunner) invocationOnMock.getMock(), 208 Thread.currentThread()); 209 } 210 countDownLatch.countDown(); 211 return null; 212 } 213 }; 214 for (CallRunner task : tasks) { 215 task.setStatus(new MonitoredRPCHandlerImpl()); 216 doAnswer(answerToRun).when(task).run(); 217 } 218 219 RpcScheduler scheduler = new SimpleRpcScheduler( 220 conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS); 221 scheduler.init(CONTEXT); 222 scheduler.start(); 223 for (CallRunner task : tasks) { 224 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 225 scheduler.dispatch(task); 226 } 227 for (CallRunner task : tasks) { 228 verify(task, timeout(10000)).run(); 229 } 230 scheduler.stop(); 231 232 // Tests that these requests are handled by three distinct threads. 233 countDownLatch.await(); 234 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 235 } 236 237 private CallRunner createMockTask() { 238 ServerCall call = mock(ServerCall.class); 239 CallRunner task = mock(CallRunner.class); 240 when(task.getRpcCall()).thenReturn(call); 241 return task; 242 } 243 244 @Test 245 public void testRpcScheduler() throws Exception { 246 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 247 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 248 } 249 250 private void testRpcScheduler(final String queueType) throws Exception { 251 Configuration schedConf = HBaseConfiguration.create(); 252 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 253 254 PriorityFunction priority = mock(PriorityFunction.class); 255 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 256 257 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, 258 HConstants.QOS_THRESHOLD); 259 try { 260 scheduler.start(); 261 262 CallRunner smallCallTask = mock(CallRunner.class); 263 ServerCall smallCall = mock(ServerCall.class); 264 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 265 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 266 when(smallCall.getHeader()).thenReturn(smallHead); 267 268 CallRunner largeCallTask = mock(CallRunner.class); 269 ServerCall largeCall = mock(ServerCall.class); 270 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 271 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 272 when(largeCall.getHeader()).thenReturn(largeHead); 273 274 CallRunner hugeCallTask = mock(CallRunner.class); 275 ServerCall hugeCall = mock(ServerCall.class); 276 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 277 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 278 when(hugeCall.getHeader()).thenReturn(hugeHead); 279 280 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 281 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 282 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 283 284 final ArrayList<Integer> work = new ArrayList<>(); 285 doAnswerTaskExecution(smallCallTask, work, 10, 250); 286 doAnswerTaskExecution(largeCallTask, work, 50, 250); 287 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 288 289 scheduler.dispatch(smallCallTask); 290 scheduler.dispatch(smallCallTask); 291 scheduler.dispatch(smallCallTask); 292 scheduler.dispatch(hugeCallTask); 293 scheduler.dispatch(smallCallTask); 294 scheduler.dispatch(largeCallTask); 295 scheduler.dispatch(smallCallTask); 296 scheduler.dispatch(smallCallTask); 297 298 while (work.size() < 8) { 299 Thread.sleep(100); 300 } 301 302 int seqSum = 0; 303 int totalTime = 0; 304 for (int i = 0; i < work.size(); ++i) { 305 LOG.debug("Request i=" + i + " value=" + work.get(i)); 306 seqSum += work.get(i); 307 totalTime += seqSum; 308 } 309 LOG.debug("Total Time: " + totalTime); 310 311 // -> [small small small huge small large small small] 312 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 313 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 314 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 315 assertEquals(530, totalTime); 316 } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) { 317 assertEquals(930, totalTime); 318 } 319 } finally { 320 scheduler.stop(); 321 } 322 } 323 324 @Test 325 public void testScanQueueWithZeroScanRatio() throws Exception { 326 327 Configuration schedConf = HBaseConfiguration.create(); 328 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 329 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 330 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 331 332 PriorityFunction priority = mock(PriorityFunction.class); 333 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 334 335 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, 336 HConstants.QOS_THRESHOLD); 337 assertNotEquals(null, scheduler); 338 } 339 340 @Test 341 public void testScanQueues() throws Exception { 342 Configuration schedConf = HBaseConfiguration.create(); 343 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 344 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 345 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 346 347 PriorityFunction priority = mock(PriorityFunction.class); 348 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 349 350 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, 351 HConstants.QOS_THRESHOLD); 352 try { 353 scheduler.start(); 354 355 CallRunner putCallTask = mock(CallRunner.class); 356 ServerCall putCall = mock(ServerCall.class); 357 putCall.param = RequestConverter.buildMutateRequest( 358 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 359 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 360 when(putCallTask.getRpcCall()).thenReturn(putCall); 361 when(putCall.getHeader()).thenReturn(putHead); 362 when(putCall.getParam()).thenReturn(putCall.param); 363 364 CallRunner getCallTask = mock(CallRunner.class); 365 ServerCall getCall = mock(ServerCall.class); 366 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 367 when(getCallTask.getRpcCall()).thenReturn(getCall); 368 when(getCall.getHeader()).thenReturn(getHead); 369 370 CallRunner scanCallTask = mock(CallRunner.class); 371 ServerCall scanCall = mock(ServerCall.class); 372 scanCall.param = ScanRequest.newBuilder().build(); 373 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 374 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 375 when(scanCall.getHeader()).thenReturn(scanHead); 376 when(scanCall.getParam()).thenReturn(scanCall.param); 377 378 ArrayList<Integer> work = new ArrayList<>(); 379 doAnswerTaskExecution(putCallTask, work, 1, 1000); 380 doAnswerTaskExecution(getCallTask, work, 2, 1000); 381 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 382 383 // There are 3 queues: [puts], [gets], [scans] 384 // so the calls will be interleaved 385 scheduler.dispatch(putCallTask); 386 scheduler.dispatch(putCallTask); 387 scheduler.dispatch(putCallTask); 388 scheduler.dispatch(getCallTask); 389 scheduler.dispatch(getCallTask); 390 scheduler.dispatch(getCallTask); 391 scheduler.dispatch(scanCallTask); 392 scheduler.dispatch(scanCallTask); 393 scheduler.dispatch(scanCallTask); 394 395 while (work.size() < 6) { 396 Thread.sleep(100); 397 } 398 399 for (int i = 0; i < work.size() - 2; i += 3) { 400 assertNotEquals(work.get(i + 0), work.get(i + 1)); 401 assertNotEquals(work.get(i + 0), work.get(i + 2)); 402 assertNotEquals(work.get(i + 1), work.get(i + 2)); 403 } 404 } finally { 405 scheduler.stop(); 406 } 407 } 408 409 private void doAnswerTaskExecution(final CallRunner callTask, 410 final ArrayList<Integer> results, final int value, final int sleepInterval) { 411 callTask.setStatus(new MonitoredRPCHandlerImpl()); 412 doAnswer(new Answer<Object>() { 413 @Override 414 public Object answer(InvocationOnMock invocation) { 415 synchronized (results) { 416 results.add(value); 417 } 418 Threads.sleepWithoutInterrupt(sleepInterval); 419 return null; 420 } 421 }).when(callTask).run(); 422 } 423 424 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 425 throws InterruptedException { 426 while (scheduler.getGeneralQueueLength() > 0) { 427 Thread.sleep(100); 428 } 429 } 430 431 @Test 432 public void testSoftAndHardQueueLimits() throws Exception { 433 434 Configuration schedConf = HBaseConfiguration.create(); 435 436 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 437 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 438 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 439 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 440 441 PriorityFunction priority = mock(PriorityFunction.class); 442 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 443 SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, 444 HConstants.QOS_THRESHOLD); 445 try { 446 scheduler.start(); 447 448 CallRunner putCallTask = mock(CallRunner.class); 449 ServerCall putCall = mock(ServerCall.class); 450 putCall.param = RequestConverter.buildMutateRequest( 451 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 452 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 453 when(putCallTask.getRpcCall()).thenReturn(putCall); 454 when(putCall.getHeader()).thenReturn(putHead); 455 456 assertTrue(scheduler.dispatch(putCallTask)); 457 458 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 459 scheduler.onConfigurationChange(schedConf); 460 assertFalse(scheduler.dispatch(putCallTask)); 461 waitUntilQueueEmpty(scheduler); 462 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 463 scheduler.onConfigurationChange(schedConf); 464 assertTrue(scheduler.dispatch(putCallTask)); 465 } finally { 466 scheduler.stop(); 467 } 468 } 469 470 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 471 472 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 473 474 private long offset; 475 476 private final Set<String> threadNamePrefixs = new HashSet<>(); 477 478 @Override 479 public long currentTime() { 480 for (String threadNamePrefix : threadNamePrefixs) { 481 String threadName = Thread.currentThread().getName(); 482 if (threadName.startsWith(threadNamePrefix)) { 483 return timeQ.poll().longValue() + offset; 484 } 485 } 486 return System.currentTimeMillis(); 487 } 488 } 489 490 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay 491 // from the get go. So we are always overloaded. The test below would seem to complete the 492 // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping 493 // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for 494 // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e. 495 // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath 496 // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, 497 // numCallQueues... Codel is hard to test. This test is going to be flakey given it all 498 // timer-based. Disabling for now till chat with authors. 499 @Test 500 public void testCoDelScheduling() throws Exception { 501 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 502 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 503 Configuration schedConf = HBaseConfiguration.create(); 504 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 505 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 506 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 507 PriorityFunction priority = mock(PriorityFunction.class); 508 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 509 SimpleRpcScheduler scheduler = 510 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 511 try { 512 // Loading mocked call runner can take a good amount of time the first time through 513 // (haven't looked why). Load it for first time here outside of the timed loop. 514 getMockedCallRunner(System.currentTimeMillis(), 2); 515 scheduler.start(); 516 EnvironmentEdgeManager.injectEdge(envEdge); 517 envEdge.offset = 5; 518 // Calls faster than min delay 519 // LOG.info("Start"); 520 for (int i = 0; i < 100; i++) { 521 long time = System.currentTimeMillis(); 522 envEdge.timeQ.put(time); 523 CallRunner cr = getMockedCallRunner(time, 2); 524 // LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr); 525 scheduler.dispatch(cr); 526 } 527 // LOG.info("Loop done"); 528 // make sure fast calls are handled 529 waitUntilQueueEmpty(scheduler); 530 Thread.sleep(100); 531 assertEquals("None of these calls should have been discarded", 0, 532 scheduler.getNumGeneralCallsDropped()); 533 534 envEdge.offset = 151; 535 // calls slower than min delay, but not individually slow enough to be dropped 536 for (int i = 0; i < 20; i++) { 537 long time = System.currentTimeMillis(); 538 envEdge.timeQ.put(time); 539 CallRunner cr = getMockedCallRunner(time, 2); 540 scheduler.dispatch(cr); 541 } 542 543 // make sure somewhat slow calls are handled 544 waitUntilQueueEmpty(scheduler); 545 Thread.sleep(100); 546 assertEquals("None of these calls should have been discarded", 0, 547 scheduler.getNumGeneralCallsDropped()); 548 549 envEdge.offset = 2000; 550 // now slow calls and the ones to be dropped 551 for (int i = 0; i < 60; i++) { 552 long time = System.currentTimeMillis(); 553 envEdge.timeQ.put(time); 554 CallRunner cr = getMockedCallRunner(time, 100); 555 scheduler.dispatch(cr); 556 } 557 558 // make sure somewhat slow calls are handled 559 waitUntilQueueEmpty(scheduler); 560 Thread.sleep(100); 561 assertTrue( 562 "There should have been at least 12 calls dropped however there were " 563 + scheduler.getNumGeneralCallsDropped(), 564 scheduler.getNumGeneralCallsDropped() > 12); 565 } finally { 566 scheduler.stop(); 567 } 568 } 569 570 @Test 571 public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { 572 String name = testName.getMethodName(); 573 int handlerCount = 1; 574 String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; 575 int maxQueueLength = 0; 576 PriorityFunction priority = mock(PriorityFunction.class); 577 Configuration conf = HBaseConfiguration.create(); 578 Abortable abortable = mock(Abortable.class); 579 FastPathBalancedQueueRpcExecutor executor = 580 Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, 581 handlerCount, callQueueType, maxQueueLength, priority, conf, abortable)); 582 CallRunner task = mock(CallRunner.class); 583 assertFalse(executor.dispatch(task)); 584 //make sure we never internally get a handler, which would skip the queue validation 585 Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), 586 Mockito.any(), Mockito.any()); 587 } 588 589 @Test 590 public void testMetaRWScanQueues() throws Exception { 591 Configuration schedConf = HBaseConfiguration.create(); 592 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 593 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 594 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 595 596 PriorityFunction priority = mock(PriorityFunction.class); 597 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 598 599 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, 600 HConstants.QOS_THRESHOLD); 601 try { 602 scheduler.start(); 603 604 CallRunner putCallTask = mock(CallRunner.class); 605 ServerCall putCall = mock(ServerCall.class); 606 putCall.param = RequestConverter.buildMutateRequest( 607 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 608 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 609 when(putCallTask.getRpcCall()).thenReturn(putCall); 610 when(putCall.getHeader()).thenReturn(putHead); 611 when(putCall.getParam()).thenReturn(putCall.param); 612 613 CallRunner getCallTask = mock(CallRunner.class); 614 ServerCall getCall = mock(ServerCall.class); 615 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 616 when(getCallTask.getRpcCall()).thenReturn(getCall); 617 when(getCall.getHeader()).thenReturn(getHead); 618 619 CallRunner scanCallTask = mock(CallRunner.class); 620 ServerCall scanCall = mock(ServerCall.class); 621 scanCall.param = ScanRequest.newBuilder().build(); 622 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 623 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 624 when(scanCall.getHeader()).thenReturn(scanHead); 625 when(scanCall.getParam()).thenReturn(scanCall.param); 626 627 ArrayList<Integer> work = new ArrayList<>(); 628 doAnswerTaskExecution(putCallTask, work, 1, 1000); 629 doAnswerTaskExecution(getCallTask, work, 2, 1000); 630 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 631 632 // There are 3 queues: [puts], [gets], [scans] 633 // so the calls will be interleaved 634 scheduler.dispatch(putCallTask); 635 scheduler.dispatch(putCallTask); 636 scheduler.dispatch(putCallTask); 637 scheduler.dispatch(getCallTask); 638 scheduler.dispatch(getCallTask); 639 scheduler.dispatch(getCallTask); 640 scheduler.dispatch(scanCallTask); 641 scheduler.dispatch(scanCallTask); 642 scheduler.dispatch(scanCallTask); 643 644 while (work.size() < 6) { 645 Thread.sleep(100); 646 } 647 648 for (int i = 0; i < work.size() - 2; i += 3) { 649 assertNotEquals(work.get(i + 0), work.get(i + 1)); 650 assertNotEquals(work.get(i + 0), work.get(i + 2)); 651 assertNotEquals(work.get(i + 1), work.get(i + 2)); 652 } 653 } finally { 654 scheduler.stop(); 655 } 656 } 657 658 // Get mocked call that has the CallRunner sleep for a while so that the fast 659 // path isn't hit. 660 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 661 ServerCall putCall = new ServerCall(1, null, null, 662 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 663 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 664 null, null, 9, null, timestamp, 0, null, null, null) { 665 666 @Override 667 public void sendResponseIfReady() throws IOException { 668 } 669 }; 670 671 CallRunner cr = new CallRunner(null, putCall) { 672 @Override 673 public void run() { 674 if (sleepTime <= 0) { 675 return; 676 } 677 try { 678 LOG.warn("Sleeping for " + sleepTime); 679 Thread.sleep(sleepTime); 680 LOG.warn("Done Sleeping for " + sleepTime); 681 } catch (InterruptedException e) { 682 } 683 } 684 685 @Override 686 public RpcCall getRpcCall() { 687 return putCall; 688 } 689 690 @Override 691 public void drop() { 692 } 693 }; 694 695 return cr; 696 } 697}