001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.eq; 026import static org.mockito.Mockito.doAnswer; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.timeout; 029import static org.mockito.Mockito.verify; 030import static org.mockito.Mockito.when; 031import java.io.IOException; 032import java.lang.reflect.Field; 033import java.net.InetSocketAddress; 034import java.util.ArrayList; 035import java.util.HashSet; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.CountDownLatch; 041import java.util.concurrent.LinkedBlockingQueue; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.testclassification.RPCTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdge; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.Threads; 054import org.junit.Before; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.mockito.invocation.InvocationOnMock; 059import org.mockito.stubbing.Answer; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 065import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 066import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 070 071@Category({RPCTests.class, MediumTests.class}) 072public class TestSimpleRpcScheduler { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 079 080 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 081 @Override 082 public InetSocketAddress getListenerAddress() { 083 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 084 } 085 }; 086 private Configuration conf; 087 088 @Before 089 public void setUp() { 090 conf = HBaseConfiguration.create(); 091 } 092 093 @Test 094 public void testBasic() throws IOException, InterruptedException { 095 096 PriorityFunction qosFunction = mock(PriorityFunction.class); 097 RpcScheduler scheduler = new SimpleRpcScheduler( 098 conf, 10, 0, 0, qosFunction, 0); 099 scheduler.init(CONTEXT); 100 scheduler.start(); 101 CallRunner task = createMockTask(); 102 task.setStatus(new MonitoredRPCHandlerImpl()); 103 scheduler.dispatch(task); 104 verify(task, timeout(10000)).run(); 105 scheduler.stop(); 106 } 107 108 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 109 try { 110 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 111 ExecutorField.setAccessible(true); 112 113 RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler); 114 115 Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass(). 116 getDeclaredField("handlerCount"); 117 118 handlerCountField.setAccessible(true); 119 handlerCountField.set(rpcExecutor, 0); 120 121 Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass(). 122 getDeclaredField("numCallQueues"); 123 124 numCallQueuesField.setAccessible(true); 125 numCallQueuesField.set(rpcExecutor, 1); 126 127 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass(). 128 getDeclaredField("currentQueueLimit"); 129 130 currentQueueLimitField.setAccessible(true); 131 currentQueueLimitField.set(rpcExecutor, 100); 132 133 } catch (NoSuchFieldException e) { 134 LOG.error("No such field exception"+e); 135 } catch (IllegalAccessException e) { 136 LOG.error("Illegal access exception"+e); 137 } 138 139 return scheduler; 140 } 141 142 @Test 143 public void testCallQueueInfo() throws IOException, InterruptedException { 144 145 PriorityFunction qosFunction = mock(PriorityFunction.class); 146 RpcScheduler scheduler = new SimpleRpcScheduler( 147 conf, 0, 0, 0, qosFunction, 0); 148 149 scheduler.init(CONTEXT); 150 151 // Set the handlers to zero. So that number of requests in call Queue can be tested 152 scheduler = disableHandlers(scheduler); 153 scheduler.start(); 154 155 int totalCallMethods = 10; 156 for (int i = totalCallMethods; i>0; i--) { 157 CallRunner task = createMockTask(); 158 task.setStatus(new MonitoredRPCHandlerImpl()); 159 scheduler.dispatch(task); 160 } 161 162 163 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 164 165 for (String callQueueName:callQueueInfo.getCallQueueNames()) { 166 167 for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) { 168 assertEquals(totalCallMethods, 169 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 170 } 171 172 } 173 174 scheduler.stop(); 175 176 } 177 178 179 @Test 180 public void testHandlerIsolation() throws IOException, InterruptedException { 181 CallRunner generalTask = createMockTask(); 182 CallRunner priorityTask = createMockTask(); 183 CallRunner replicationTask = createMockTask(); 184 List<CallRunner> tasks = ImmutableList.of( 185 generalTask, 186 priorityTask, 187 replicationTask); 188 Map<CallRunner, Integer> qos = ImmutableMap.of( 189 generalTask, 0, 190 priorityTask, HConstants.HIGH_QOS + 1, 191 replicationTask, HConstants.REPLICATION_QOS); 192 PriorityFunction qosFunction = mock(PriorityFunction.class); 193 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 194 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 195 Answer<Void> answerToRun = new Answer<Void>() { 196 @Override 197 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 198 synchronized (handlerThreads) { 199 handlerThreads.put( 200 (CallRunner) invocationOnMock.getMock(), 201 Thread.currentThread()); 202 } 203 countDownLatch.countDown(); 204 return null; 205 } 206 }; 207 for (CallRunner task : tasks) { 208 task.setStatus(new MonitoredRPCHandlerImpl()); 209 doAnswer(answerToRun).when(task).run(); 210 } 211 212 RpcScheduler scheduler = new SimpleRpcScheduler( 213 conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS); 214 scheduler.init(CONTEXT); 215 scheduler.start(); 216 for (CallRunner task : tasks) { 217 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 218 scheduler.dispatch(task); 219 } 220 for (CallRunner task : tasks) { 221 verify(task, timeout(10000)).run(); 222 } 223 scheduler.stop(); 224 225 // Tests that these requests are handled by three distinct threads. 226 countDownLatch.await(); 227 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 228 } 229 230 private CallRunner createMockTask() { 231 ServerCall call = mock(ServerCall.class); 232 CallRunner task = mock(CallRunner.class); 233 when(task.getRpcCall()).thenReturn(call); 234 return task; 235 } 236 237 @Test 238 public void testRpcScheduler() throws Exception { 239 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 240 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 241 } 242 243 private void testRpcScheduler(final String queueType) throws Exception { 244 Configuration schedConf = HBaseConfiguration.create(); 245 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 246 247 PriorityFunction priority = mock(PriorityFunction.class); 248 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 249 250 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, 251 HConstants.QOS_THRESHOLD); 252 try { 253 scheduler.start(); 254 255 CallRunner smallCallTask = mock(CallRunner.class); 256 ServerCall smallCall = mock(ServerCall.class); 257 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 258 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 259 when(smallCall.getHeader()).thenReturn(smallHead); 260 261 CallRunner largeCallTask = mock(CallRunner.class); 262 ServerCall largeCall = mock(ServerCall.class); 263 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 264 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 265 when(largeCall.getHeader()).thenReturn(largeHead); 266 267 CallRunner hugeCallTask = mock(CallRunner.class); 268 ServerCall hugeCall = mock(ServerCall.class); 269 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 270 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 271 when(hugeCall.getHeader()).thenReturn(hugeHead); 272 273 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 274 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 275 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 276 277 final ArrayList<Integer> work = new ArrayList<>(); 278 doAnswerTaskExecution(smallCallTask, work, 10, 250); 279 doAnswerTaskExecution(largeCallTask, work, 50, 250); 280 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 281 282 scheduler.dispatch(smallCallTask); 283 scheduler.dispatch(smallCallTask); 284 scheduler.dispatch(smallCallTask); 285 scheduler.dispatch(hugeCallTask); 286 scheduler.dispatch(smallCallTask); 287 scheduler.dispatch(largeCallTask); 288 scheduler.dispatch(smallCallTask); 289 scheduler.dispatch(smallCallTask); 290 291 while (work.size() < 8) { 292 Thread.sleep(100); 293 } 294 295 int seqSum = 0; 296 int totalTime = 0; 297 for (int i = 0; i < work.size(); ++i) { 298 LOG.debug("Request i=" + i + " value=" + work.get(i)); 299 seqSum += work.get(i); 300 totalTime += seqSum; 301 } 302 LOG.debug("Total Time: " + totalTime); 303 304 // -> [small small small huge small large small small] 305 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 306 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 307 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 308 assertEquals(530, totalTime); 309 } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) { 310 assertEquals(930, totalTime); 311 } 312 } finally { 313 scheduler.stop(); 314 } 315 } 316 317 @Test 318 public void testScanQueueWithZeroScanRatio() throws Exception { 319 320 Configuration schedConf = HBaseConfiguration.create(); 321 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 322 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 323 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 324 325 PriorityFunction priority = mock(PriorityFunction.class); 326 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 327 328 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, 329 HConstants.QOS_THRESHOLD); 330 assertNotEquals(null, scheduler); 331 } 332 333 @Test 334 public void testScanQueues() throws Exception { 335 Configuration schedConf = HBaseConfiguration.create(); 336 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 337 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 338 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 339 340 PriorityFunction priority = mock(PriorityFunction.class); 341 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 342 343 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, 344 HConstants.QOS_THRESHOLD); 345 try { 346 scheduler.start(); 347 348 CallRunner putCallTask = mock(CallRunner.class); 349 ServerCall putCall = mock(ServerCall.class); 350 putCall.param = RequestConverter.buildMutateRequest( 351 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 352 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 353 when(putCallTask.getRpcCall()).thenReturn(putCall); 354 when(putCall.getHeader()).thenReturn(putHead); 355 when(putCall.getParam()).thenReturn(putCall.param); 356 357 CallRunner getCallTask = mock(CallRunner.class); 358 ServerCall getCall = mock(ServerCall.class); 359 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 360 when(getCallTask.getRpcCall()).thenReturn(getCall); 361 when(getCall.getHeader()).thenReturn(getHead); 362 363 CallRunner scanCallTask = mock(CallRunner.class); 364 ServerCall scanCall = mock(ServerCall.class); 365 scanCall.param = ScanRequest.newBuilder().build(); 366 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 367 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 368 when(scanCall.getHeader()).thenReturn(scanHead); 369 when(scanCall.getParam()).thenReturn(scanCall.param); 370 371 ArrayList<Integer> work = new ArrayList<>(); 372 doAnswerTaskExecution(putCallTask, work, 1, 1000); 373 doAnswerTaskExecution(getCallTask, work, 2, 1000); 374 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 375 376 // There are 3 queues: [puts], [gets], [scans] 377 // so the calls will be interleaved 378 scheduler.dispatch(putCallTask); 379 scheduler.dispatch(putCallTask); 380 scheduler.dispatch(putCallTask); 381 scheduler.dispatch(getCallTask); 382 scheduler.dispatch(getCallTask); 383 scheduler.dispatch(getCallTask); 384 scheduler.dispatch(scanCallTask); 385 scheduler.dispatch(scanCallTask); 386 scheduler.dispatch(scanCallTask); 387 388 while (work.size() < 6) { 389 Thread.sleep(100); 390 } 391 392 for (int i = 0; i < work.size() - 2; i += 3) { 393 assertNotEquals(work.get(i + 0), work.get(i + 1)); 394 assertNotEquals(work.get(i + 0), work.get(i + 2)); 395 assertNotEquals(work.get(i + 1), work.get(i + 2)); 396 } 397 } finally { 398 scheduler.stop(); 399 } 400 } 401 402 private void doAnswerTaskExecution(final CallRunner callTask, 403 final ArrayList<Integer> results, final int value, final int sleepInterval) { 404 callTask.setStatus(new MonitoredRPCHandlerImpl()); 405 doAnswer(new Answer<Object>() { 406 @Override 407 public Object answer(InvocationOnMock invocation) { 408 synchronized (results) { 409 results.add(value); 410 } 411 Threads.sleepWithoutInterrupt(sleepInterval); 412 return null; 413 } 414 }).when(callTask).run(); 415 } 416 417 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 418 throws InterruptedException { 419 while (scheduler.getGeneralQueueLength() > 0) { 420 Thread.sleep(100); 421 } 422 } 423 424 @Test 425 public void testSoftAndHardQueueLimits() throws Exception { 426 427 Configuration schedConf = HBaseConfiguration.create(); 428 429 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 430 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 431 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 432 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 433 434 PriorityFunction priority = mock(PriorityFunction.class); 435 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 436 SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, 437 HConstants.QOS_THRESHOLD); 438 try { 439 scheduler.start(); 440 441 CallRunner putCallTask = mock(CallRunner.class); 442 ServerCall putCall = mock(ServerCall.class); 443 putCall.param = RequestConverter.buildMutateRequest( 444 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 445 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 446 when(putCallTask.getRpcCall()).thenReturn(putCall); 447 when(putCall.getHeader()).thenReturn(putHead); 448 449 assertTrue(scheduler.dispatch(putCallTask)); 450 451 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 452 scheduler.onConfigurationChange(schedConf); 453 assertFalse(scheduler.dispatch(putCallTask)); 454 waitUntilQueueEmpty(scheduler); 455 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 456 scheduler.onConfigurationChange(schedConf); 457 assertTrue(scheduler.dispatch(putCallTask)); 458 } finally { 459 scheduler.stop(); 460 } 461 } 462 463 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 464 465 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 466 467 private long offset; 468 469 private final Set<String> threadNamePrefixs = new HashSet<>(); 470 471 @Override 472 public long currentTime() { 473 for (String threadNamePrefix : threadNamePrefixs) { 474 String threadName = Thread.currentThread().getName(); 475 if (threadName.startsWith(threadNamePrefix)) { 476 return timeQ.poll().longValue() + offset; 477 } 478 } 479 return System.currentTimeMillis(); 480 } 481 } 482 483 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay 484 // from the get go. So we are always overloaded. The test below would seem to complete the 485 // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping 486 // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for 487 // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e. 488 // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath 489 // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, 490 // numCallQueues... Codel is hard to test. This test is going to be flakey given it all 491 // timer-based. Disabling for now till chat with authors. 492 @Test 493 public void testCoDelScheduling() throws Exception { 494 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 495 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 496 Configuration schedConf = HBaseConfiguration.create(); 497 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 498 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 499 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 500 PriorityFunction priority = mock(PriorityFunction.class); 501 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 502 SimpleRpcScheduler scheduler = 503 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 504 try { 505 // Loading mocked call runner can take a good amount of time the first time through 506 // (haven't looked why). Load it for first time here outside of the timed loop. 507 getMockedCallRunner(System.currentTimeMillis(), 2); 508 scheduler.start(); 509 EnvironmentEdgeManager.injectEdge(envEdge); 510 envEdge.offset = 5; 511 // Calls faster than min delay 512 // LOG.info("Start"); 513 for (int i = 0; i < 100; i++) { 514 long time = System.currentTimeMillis(); 515 envEdge.timeQ.put(time); 516 CallRunner cr = getMockedCallRunner(time, 2); 517 // LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr); 518 scheduler.dispatch(cr); 519 } 520 // LOG.info("Loop done"); 521 // make sure fast calls are handled 522 waitUntilQueueEmpty(scheduler); 523 Thread.sleep(100); 524 assertEquals("None of these calls should have been discarded", 0, 525 scheduler.getNumGeneralCallsDropped()); 526 527 envEdge.offset = 151; 528 // calls slower than min delay, but not individually slow enough to be dropped 529 for (int i = 0; i < 20; i++) { 530 long time = System.currentTimeMillis(); 531 envEdge.timeQ.put(time); 532 CallRunner cr = getMockedCallRunner(time, 2); 533 scheduler.dispatch(cr); 534 } 535 536 // make sure somewhat slow calls are handled 537 waitUntilQueueEmpty(scheduler); 538 Thread.sleep(100); 539 assertEquals("None of these calls should have been discarded", 0, 540 scheduler.getNumGeneralCallsDropped()); 541 542 envEdge.offset = 2000; 543 // now slow calls and the ones to be dropped 544 for (int i = 0; i < 60; i++) { 545 long time = System.currentTimeMillis(); 546 envEdge.timeQ.put(time); 547 CallRunner cr = getMockedCallRunner(time, 100); 548 scheduler.dispatch(cr); 549 } 550 551 // make sure somewhat slow calls are handled 552 waitUntilQueueEmpty(scheduler); 553 Thread.sleep(100); 554 assertTrue( 555 "There should have been at least 12 calls dropped however there were " 556 + scheduler.getNumGeneralCallsDropped(), 557 scheduler.getNumGeneralCallsDropped() > 12); 558 } finally { 559 scheduler.stop(); 560 } 561 } 562 563 @Test 564 public void testMetaRWScanQueues() throws Exception { 565 Configuration schedConf = HBaseConfiguration.create(); 566 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 567 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 568 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 569 570 PriorityFunction priority = mock(PriorityFunction.class); 571 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 572 573 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, 574 HConstants.QOS_THRESHOLD); 575 try { 576 scheduler.start(); 577 578 CallRunner putCallTask = mock(CallRunner.class); 579 ServerCall putCall = mock(ServerCall.class); 580 putCall.param = RequestConverter.buildMutateRequest( 581 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 582 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 583 when(putCallTask.getRpcCall()).thenReturn(putCall); 584 when(putCall.getHeader()).thenReturn(putHead); 585 when(putCall.getParam()).thenReturn(putCall.param); 586 587 CallRunner getCallTask = mock(CallRunner.class); 588 ServerCall getCall = mock(ServerCall.class); 589 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 590 when(getCallTask.getRpcCall()).thenReturn(getCall); 591 when(getCall.getHeader()).thenReturn(getHead); 592 593 CallRunner scanCallTask = mock(CallRunner.class); 594 ServerCall scanCall = mock(ServerCall.class); 595 scanCall.param = ScanRequest.newBuilder().build(); 596 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 597 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 598 when(scanCall.getHeader()).thenReturn(scanHead); 599 when(scanCall.getParam()).thenReturn(scanCall.param); 600 601 ArrayList<Integer> work = new ArrayList<>(); 602 doAnswerTaskExecution(putCallTask, work, 1, 1000); 603 doAnswerTaskExecution(getCallTask, work, 2, 1000); 604 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 605 606 // There are 3 queues: [puts], [gets], [scans] 607 // so the calls will be interleaved 608 scheduler.dispatch(putCallTask); 609 scheduler.dispatch(putCallTask); 610 scheduler.dispatch(putCallTask); 611 scheduler.dispatch(getCallTask); 612 scheduler.dispatch(getCallTask); 613 scheduler.dispatch(getCallTask); 614 scheduler.dispatch(scanCallTask); 615 scheduler.dispatch(scanCallTask); 616 scheduler.dispatch(scanCallTask); 617 618 while (work.size() < 6) { 619 Thread.sleep(100); 620 } 621 622 for (int i = 0; i < work.size() - 2; i += 3) { 623 assertNotEquals(work.get(i + 0), work.get(i + 1)); 624 assertNotEquals(work.get(i + 0), work.get(i + 2)); 625 assertNotEquals(work.get(i + 1), work.get(i + 2)); 626 } 627 } finally { 628 scheduler.stop(); 629 } 630 } 631 632 // Get mocked call that has the CallRunner sleep for a while so that the fast 633 // path isn't hit. 634 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 635 ServerCall putCall = new ServerCall(1, null, null, 636 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 637 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 638 null, null, 9, null, timestamp, 0, null, null, null) { 639 640 @Override 641 public void sendResponseIfReady() throws IOException { 642 } 643 }; 644 645 CallRunner cr = new CallRunner(null, putCall) { 646 @Override 647 public void run() { 648 if (sleepTime <= 0) { 649 return; 650 } 651 try { 652 LOG.warn("Sleeping for " + sleepTime); 653 Thread.sleep(sleepTime); 654 LOG.warn("Done Sleeping for " + sleepTime); 655 } catch (InterruptedException e) { 656 } 657 } 658 659 @Override 660 public RpcCall getRpcCall() { 661 return putCall; 662 } 663 664 @Override 665 public void drop() { 666 } 667 }; 668 669 return cr; 670 } 671}