001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.eq; 026import static org.mockito.Mockito.doAnswer; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.timeout; 029import static org.mockito.Mockito.verify; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.lang.reflect.Field; 034import java.net.InetSocketAddress; 035import java.util.ArrayList; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.CountDownLatch; 042import java.util.concurrent.LinkedBlockingQueue; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 049import org.apache.hadoop.hbase.testclassification.RPCTests; 050import org.apache.hadoop.hbase.testclassification.SmallTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdge; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.Threads; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.mockito.invocation.InvocationOnMock; 060import org.mockito.stubbing.Answer; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 065import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 067import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 068 069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 073 074@Category({RPCTests.class, SmallTests.class}) 075public class TestSimpleRpcScheduler { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 082 083 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 084 @Override 085 public InetSocketAddress getListenerAddress() { 086 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 087 } 088 }; 089 private Configuration conf; 090 091 @Before 092 public void setUp() { 093 conf = HBaseConfiguration.create(); 094 } 095 096 @Test 097 public void testBasic() throws IOException, InterruptedException { 098 099 PriorityFunction qosFunction = mock(PriorityFunction.class); 100 RpcScheduler scheduler = new SimpleRpcScheduler( 101 conf, 10, 0, 0, qosFunction, 0); 102 scheduler.init(CONTEXT); 103 scheduler.start(); 104 CallRunner task = createMockTask(); 105 task.setStatus(new MonitoredRPCHandlerImpl()); 106 scheduler.dispatch(task); 107 verify(task, timeout(10000)).run(); 108 scheduler.stop(); 109 } 110 111 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 112 try { 113 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 114 ExecutorField.setAccessible(true); 115 116 RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler); 117 118 Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount"); 119 120 handlerCountField.setAccessible(true); 121 handlerCountField.set(rpcExecutor, 0); 122 123 Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues"); 124 125 numCallQueuesField.setAccessible(true); 126 numCallQueuesField.set(rpcExecutor, 1); 127 128 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("currentQueueLimit"); 129 130 currentQueueLimitField.setAccessible(true); 131 currentQueueLimitField.set(rpcExecutor, 100); 132 133 } catch (NoSuchFieldException e) { 134 LOG.error("No such field exception"+e); 135 } catch (IllegalAccessException e) { 136 LOG.error("Illegal access exception"+e); 137 } 138 139 return scheduler; 140 } 141 142 @Test 143 public void testCallQueueInfo() throws IOException, InterruptedException { 144 145 PriorityFunction qosFunction = mock(PriorityFunction.class); 146 RpcScheduler scheduler = new SimpleRpcScheduler( 147 conf, 0, 0, 0, qosFunction, 0); 148 149 scheduler.init(CONTEXT); 150 151 // Set the handlers to zero. So that number of requests in call Queue can be tested 152 scheduler = disableHandlers(scheduler); 153 scheduler.start(); 154 155 int totalCallMethods = 10; 156 for (int i = totalCallMethods; i>0; i--) { 157 CallRunner task = createMockTask(); 158 task.setStatus(new MonitoredRPCHandlerImpl()); 159 scheduler.dispatch(task); 160 } 161 162 163 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 164 165 for (String callQueueName:callQueueInfo.getCallQueueNames()) { 166 167 for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) { 168 assertEquals(totalCallMethods, 169 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 170 } 171 172 } 173 174 scheduler.stop(); 175 176 } 177 178 179 @Test 180 public void testHandlerIsolation() throws IOException, InterruptedException { 181 CallRunner generalTask = createMockTask(); 182 CallRunner priorityTask = createMockTask(); 183 CallRunner replicationTask = createMockTask(); 184 List<CallRunner> tasks = ImmutableList.of( 185 generalTask, 186 priorityTask, 187 replicationTask); 188 Map<CallRunner, Integer> qos = ImmutableMap.of( 189 generalTask, 0, 190 priorityTask, HConstants.HIGH_QOS + 1, 191 replicationTask, HConstants.REPLICATION_QOS); 192 PriorityFunction qosFunction = mock(PriorityFunction.class); 193 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 194 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 195 Answer<Void> answerToRun = new Answer<Void>() { 196 @Override 197 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 198 synchronized (handlerThreads) { 199 handlerThreads.put( 200 (CallRunner) invocationOnMock.getMock(), 201 Thread.currentThread()); 202 } 203 countDownLatch.countDown(); 204 return null; 205 } 206 }; 207 for (CallRunner task : tasks) { 208 task.setStatus(new MonitoredRPCHandlerImpl()); 209 doAnswer(answerToRun).when(task).run(); 210 } 211 212 RpcScheduler scheduler = new SimpleRpcScheduler( 213 conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS); 214 scheduler.init(CONTEXT); 215 scheduler.start(); 216 for (CallRunner task : tasks) { 217 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 218 scheduler.dispatch(task); 219 } 220 for (CallRunner task : tasks) { 221 verify(task, timeout(10000)).run(); 222 } 223 scheduler.stop(); 224 225 // Tests that these requests are handled by three distinct threads. 226 countDownLatch.await(); 227 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 228 } 229 230 private CallRunner createMockTask() { 231 ServerCall call = mock(ServerCall.class); 232 CallRunner task = mock(CallRunner.class); 233 when(task.getRpcCall()).thenReturn(call); 234 return task; 235 } 236 237 @Test 238 public void testRpcScheduler() throws Exception { 239 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 240 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 241 } 242 243 private void testRpcScheduler(final String queueType) throws Exception { 244 Configuration schedConf = HBaseConfiguration.create(); 245 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 246 247 PriorityFunction priority = mock(PriorityFunction.class); 248 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 249 250 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, 251 HConstants.QOS_THRESHOLD); 252 try { 253 scheduler.start(); 254 255 CallRunner smallCallTask = mock(CallRunner.class); 256 ServerCall smallCall = mock(ServerCall.class); 257 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 258 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 259 when(smallCall.getHeader()).thenReturn(smallHead); 260 261 CallRunner largeCallTask = mock(CallRunner.class); 262 ServerCall largeCall = mock(ServerCall.class); 263 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 264 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 265 when(largeCall.getHeader()).thenReturn(largeHead); 266 267 CallRunner hugeCallTask = mock(CallRunner.class); 268 ServerCall hugeCall = mock(ServerCall.class); 269 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 270 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 271 when(hugeCall.getHeader()).thenReturn(hugeHead); 272 273 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 274 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 275 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 276 277 final ArrayList<Integer> work = new ArrayList<>(); 278 doAnswerTaskExecution(smallCallTask, work, 10, 250); 279 doAnswerTaskExecution(largeCallTask, work, 50, 250); 280 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 281 282 scheduler.dispatch(smallCallTask); 283 scheduler.dispatch(smallCallTask); 284 scheduler.dispatch(smallCallTask); 285 scheduler.dispatch(hugeCallTask); 286 scheduler.dispatch(smallCallTask); 287 scheduler.dispatch(largeCallTask); 288 scheduler.dispatch(smallCallTask); 289 scheduler.dispatch(smallCallTask); 290 291 while (work.size() < 8) { 292 Thread.sleep(100); 293 } 294 295 int seqSum = 0; 296 int totalTime = 0; 297 for (int i = 0; i < work.size(); ++i) { 298 LOG.debug("Request i=" + i + " value=" + work.get(i)); 299 seqSum += work.get(i); 300 totalTime += seqSum; 301 } 302 LOG.debug("Total Time: " + totalTime); 303 304 // -> [small small small huge small large small small] 305 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 306 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 307 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 308 assertEquals(530, totalTime); 309 } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) { 310 assertEquals(930, totalTime); 311 } 312 } finally { 313 scheduler.stop(); 314 } 315 } 316 317 @Test 318 public void testScanQueueWithZeroScanRatio() throws Exception { 319 320 Configuration schedConf = HBaseConfiguration.create(); 321 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 322 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 323 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 324 325 PriorityFunction priority = mock(PriorityFunction.class); 326 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 327 328 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, 329 HConstants.QOS_THRESHOLD); 330 assertNotEquals(null, scheduler); 331 } 332 333 @Test 334 public void testScanQueues() throws Exception { 335 Configuration schedConf = HBaseConfiguration.create(); 336 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 337 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 338 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 339 340 PriorityFunction priority = mock(PriorityFunction.class); 341 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 342 343 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, 344 HConstants.QOS_THRESHOLD); 345 try { 346 scheduler.start(); 347 348 CallRunner putCallTask = mock(CallRunner.class); 349 ServerCall putCall = mock(ServerCall.class); 350 putCall.param = RequestConverter.buildMutateRequest( 351 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 352 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 353 when(putCallTask.getRpcCall()).thenReturn(putCall); 354 when(putCall.getHeader()).thenReturn(putHead); 355 when(putCall.getParam()).thenReturn(putCall.param); 356 357 CallRunner getCallTask = mock(CallRunner.class); 358 ServerCall getCall = mock(ServerCall.class); 359 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 360 when(getCallTask.getRpcCall()).thenReturn(getCall); 361 when(getCall.getHeader()).thenReturn(getHead); 362 363 CallRunner scanCallTask = mock(CallRunner.class); 364 ServerCall scanCall = mock(ServerCall.class); 365 scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); 366 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 367 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 368 when(scanCall.getHeader()).thenReturn(scanHead); 369 when(scanCall.getParam()).thenReturn(scanCall.param); 370 371 ArrayList<Integer> work = new ArrayList<>(); 372 doAnswerTaskExecution(putCallTask, work, 1, 1000); 373 doAnswerTaskExecution(getCallTask, work, 2, 1000); 374 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 375 376 // There are 3 queues: [puts], [gets], [scans] 377 // so the calls will be interleaved 378 scheduler.dispatch(putCallTask); 379 scheduler.dispatch(putCallTask); 380 scheduler.dispatch(putCallTask); 381 scheduler.dispatch(getCallTask); 382 scheduler.dispatch(getCallTask); 383 scheduler.dispatch(getCallTask); 384 scheduler.dispatch(scanCallTask); 385 scheduler.dispatch(scanCallTask); 386 scheduler.dispatch(scanCallTask); 387 388 while (work.size() < 6) { 389 Thread.sleep(100); 390 } 391 392 for (int i = 0; i < work.size() - 2; i += 3) { 393 assertNotEquals(work.get(i + 0), work.get(i + 1)); 394 assertNotEquals(work.get(i + 0), work.get(i + 2)); 395 assertNotEquals(work.get(i + 1), work.get(i + 2)); 396 } 397 } finally { 398 scheduler.stop(); 399 } 400 } 401 402 private void doAnswerTaskExecution(final CallRunner callTask, 403 final ArrayList<Integer> results, final int value, final int sleepInterval) { 404 callTask.setStatus(new MonitoredRPCHandlerImpl()); 405 doAnswer(new Answer<Object>() { 406 @Override 407 public Object answer(InvocationOnMock invocation) { 408 synchronized (results) { 409 results.add(value); 410 } 411 Threads.sleepWithoutInterrupt(sleepInterval); 412 return null; 413 } 414 }).when(callTask).run(); 415 } 416 417 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 418 throws InterruptedException { 419 while (scheduler.getGeneralQueueLength() > 0) { 420 Thread.sleep(100); 421 } 422 } 423 424 @Test 425 public void testSoftAndHardQueueLimits() throws Exception { 426 427 Configuration schedConf = HBaseConfiguration.create(); 428 429 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 430 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 431 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 432 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 433 434 PriorityFunction priority = mock(PriorityFunction.class); 435 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 436 SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, 437 HConstants.QOS_THRESHOLD); 438 try { 439 scheduler.start(); 440 441 CallRunner putCallTask = mock(CallRunner.class); 442 ServerCall putCall = mock(ServerCall.class); 443 putCall.param = RequestConverter.buildMutateRequest( 444 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 445 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 446 when(putCallTask.getRpcCall()).thenReturn(putCall); 447 when(putCall.getHeader()).thenReturn(putHead); 448 449 assertTrue(scheduler.dispatch(putCallTask)); 450 451 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 452 scheduler.onConfigurationChange(schedConf); 453 assertFalse(scheduler.dispatch(putCallTask)); 454 waitUntilQueueEmpty(scheduler); 455 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 456 scheduler.onConfigurationChange(schedConf); 457 assertTrue(scheduler.dispatch(putCallTask)); 458 } finally { 459 scheduler.stop(); 460 } 461 } 462 463 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 464 465 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 466 467 private long offset; 468 469 private final Set<String> threadNamePrefixs = new HashSet<>(); 470 471 @Override 472 public long currentTime() { 473 for (String threadNamePrefix : threadNamePrefixs) { 474 String threadName = Thread.currentThread().getName(); 475 if (threadName.startsWith(threadNamePrefix)) { 476 return timeQ.poll().longValue() + offset; 477 } 478 } 479 return System.currentTimeMillis(); 480 } 481 } 482 483 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay from the get go. 484 // So we are always overloaded. The test below would seem to complete the queuing of all the CallRunners inside 485 // the codel check interval. I don't think we are skipping codel checking. Second, I think this test has been 486 // broken since HBASE-16089 Add on FastPath for CoDel went in. The thread name we were looking for was the name 487 // BEFORE we updated: i.e. "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel 488 // fastpath thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues... 489 // Codel is hard to test. This test is going to be flakey given it all timer-based. Disabling for now till chat 490 // with authors. 491 @Test 492 public void testCoDelScheduling() throws Exception { 493 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 494 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 495 Configuration schedConf = HBaseConfiguration.create(); 496 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 497 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 498 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 499 PriorityFunction priority = mock(PriorityFunction.class); 500 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 501 SimpleRpcScheduler scheduler = 502 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 503 try { 504 // Loading mocked call runner can take a good amount of time the first time through (haven't looked why). 505 // Load it for first time here outside of the timed loop. 506 getMockedCallRunner(System.currentTimeMillis(), 2); 507 scheduler.start(); 508 EnvironmentEdgeManager.injectEdge(envEdge); 509 envEdge.offset = 5; 510 // Calls faster than min delay 511 // LOG.info("Start"); 512 for (int i = 0; i < 100; i++) { 513 long time = System.currentTimeMillis(); 514 envEdge.timeQ.put(time); 515 CallRunner cr = getMockedCallRunner(time, 2); 516 // LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr); 517 scheduler.dispatch(cr); 518 } 519 // LOG.info("Loop done"); 520 // make sure fast calls are handled 521 waitUntilQueueEmpty(scheduler); 522 Thread.sleep(100); 523 assertEquals("None of these calls should have been discarded", 0, 524 scheduler.getNumGeneralCallsDropped()); 525 526 envEdge.offset = 151; 527 // calls slower than min delay, but not individually slow enough to be dropped 528 for (int i = 0; i < 20; i++) { 529 long time = System.currentTimeMillis(); 530 envEdge.timeQ.put(time); 531 CallRunner cr = getMockedCallRunner(time, 2); 532 scheduler.dispatch(cr); 533 } 534 535 // make sure somewhat slow calls are handled 536 waitUntilQueueEmpty(scheduler); 537 Thread.sleep(100); 538 assertEquals("None of these calls should have been discarded", 0, 539 scheduler.getNumGeneralCallsDropped()); 540 541 envEdge.offset = 2000; 542 // now slow calls and the ones to be dropped 543 for (int i = 0; i < 60; i++) { 544 long time = System.currentTimeMillis(); 545 envEdge.timeQ.put(time); 546 CallRunner cr = getMockedCallRunner(time, 100); 547 scheduler.dispatch(cr); 548 } 549 550 // make sure somewhat slow calls are handled 551 waitUntilQueueEmpty(scheduler); 552 Thread.sleep(100); 553 assertTrue( 554 "There should have been at least 12 calls dropped however there were " 555 + scheduler.getNumGeneralCallsDropped(), 556 scheduler.getNumGeneralCallsDropped() > 12); 557 } finally { 558 scheduler.stop(); 559 } 560 } 561 562 // Get mocked call that has the CallRunner sleep for a while so that the fast 563 // path isn't hit. 564 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 565 ServerCall putCall = new ServerCall(1, null, null, 566 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 567 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 568 null, null, 9, null, timestamp, 0, null, null, null) { 569 570 @Override 571 public void sendResponseIfReady() throws IOException { 572 } 573 }; 574 575 CallRunner cr = new CallRunner(null, putCall) { 576 @Override 577 public void run() { 578 if (sleepTime <= 0) return; 579 try { 580 LOG.warn("Sleeping for " + sleepTime); 581 Thread.sleep(sleepTime); 582 LOG.warn("Done Sleeping for " + sleepTime); 583 } catch (InterruptedException e) { 584 } 585 } 586 587 @Override 588 public RpcCall getRpcCall() { 589 return putCall; 590 } 591 592 @Override 593 public void drop() { 594 } 595 }; 596 597 return cr; 598 } 599}