001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.eq;
026import static org.mockito.Mockito.doAnswer;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.timeout;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.net.InetSocketAddress;
035import java.util.ArrayList;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.BlockingQueue;
041import java.util.concurrent.CountDownLatch;
042import java.util.concurrent.LinkedBlockingQueue;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
049import org.apache.hadoop.hbase.testclassification.RPCTests;
050import org.apache.hadoop.hbase.testclassification.SmallTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.EnvironmentEdge;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.Threads;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.mockito.invocation.InvocationOnMock;
060import org.mockito.stubbing.Answer;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
065import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
067import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
068
069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
073
074@Category({RPCTests.class, SmallTests.class})
075public class TestSimpleRpcScheduler {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
082
083  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
084    @Override
085    public InetSocketAddress getListenerAddress() {
086      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
087    }
088  };
089  private Configuration conf;
090
091  @Before
092  public void setUp() {
093    conf = HBaseConfiguration.create();
094  }
095
096  @Test
097  public void testBasic() throws IOException, InterruptedException {
098
099    PriorityFunction qosFunction = mock(PriorityFunction.class);
100    RpcScheduler scheduler = new SimpleRpcScheduler(
101        conf, 10, 0, 0, qosFunction, 0);
102    scheduler.init(CONTEXT);
103    scheduler.start();
104    CallRunner task = createMockTask();
105    task.setStatus(new MonitoredRPCHandlerImpl());
106    scheduler.dispatch(task);
107    verify(task, timeout(10000)).run();
108    scheduler.stop();
109  }
110
111  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
112    try {
113      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
114      ExecutorField.setAccessible(true);
115
116      RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
117
118      Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
119
120      handlerCountField.setAccessible(true);
121      handlerCountField.set(rpcExecutor, 0);
122
123      Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
124
125      numCallQueuesField.setAccessible(true);
126      numCallQueuesField.set(rpcExecutor, 1);
127
128      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("currentQueueLimit");
129
130      currentQueueLimitField.setAccessible(true);
131      currentQueueLimitField.set(rpcExecutor, 100);
132
133    } catch (NoSuchFieldException e) {
134      LOG.error("No such field exception"+e);
135    } catch (IllegalAccessException e) {
136      LOG.error("Illegal access exception"+e);
137    }
138
139    return scheduler;
140  }
141
142  @Test
143  public void testCallQueueInfo() throws IOException, InterruptedException {
144
145    PriorityFunction qosFunction = mock(PriorityFunction.class);
146    RpcScheduler scheduler = new SimpleRpcScheduler(
147            conf, 0, 0, 0, qosFunction, 0);
148
149    scheduler.init(CONTEXT);
150
151    // Set the handlers to zero. So that number of requests in call Queue can be tested
152    scheduler = disableHandlers(scheduler);
153    scheduler.start();
154
155    int totalCallMethods = 10;
156    for (int i = totalCallMethods; i>0; i--) {
157      CallRunner task = createMockTask();
158      task.setStatus(new MonitoredRPCHandlerImpl());
159      scheduler.dispatch(task);
160    }
161
162
163    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
164
165    for (String callQueueName:callQueueInfo.getCallQueueNames()) {
166
167      for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
168        assertEquals(totalCallMethods,
169            callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
170      }
171
172    }
173
174    scheduler.stop();
175
176  }
177
178
179  @Test
180  public void testHandlerIsolation() throws IOException, InterruptedException {
181    CallRunner generalTask = createMockTask();
182    CallRunner priorityTask = createMockTask();
183    CallRunner replicationTask = createMockTask();
184    List<CallRunner> tasks = ImmutableList.of(
185        generalTask,
186        priorityTask,
187        replicationTask);
188    Map<CallRunner, Integer> qos = ImmutableMap.of(
189        generalTask, 0,
190        priorityTask, HConstants.HIGH_QOS + 1,
191        replicationTask, HConstants.REPLICATION_QOS);
192    PriorityFunction qosFunction = mock(PriorityFunction.class);
193    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
194    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
195    Answer<Void> answerToRun = new Answer<Void>() {
196      @Override
197      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
198        synchronized (handlerThreads) {
199          handlerThreads.put(
200              (CallRunner) invocationOnMock.getMock(),
201              Thread.currentThread());
202        }
203        countDownLatch.countDown();
204        return null;
205      }
206    };
207    for (CallRunner task : tasks) {
208      task.setStatus(new MonitoredRPCHandlerImpl());
209      doAnswer(answerToRun).when(task).run();
210    }
211
212    RpcScheduler scheduler = new SimpleRpcScheduler(
213        conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
214    scheduler.init(CONTEXT);
215    scheduler.start();
216    for (CallRunner task : tasks) {
217      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
218      scheduler.dispatch(task);
219    }
220    for (CallRunner task : tasks) {
221      verify(task, timeout(10000)).run();
222    }
223    scheduler.stop();
224
225    // Tests that these requests are handled by three distinct threads.
226    countDownLatch.await();
227    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
228  }
229
230  private CallRunner createMockTask() {
231    ServerCall call = mock(ServerCall.class);
232    CallRunner task = mock(CallRunner.class);
233    when(task.getRpcCall()).thenReturn(call);
234    return task;
235  }
236
237  @Test
238  public void testRpcScheduler() throws Exception {
239    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
240    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
241  }
242
243  private void testRpcScheduler(final String queueType) throws Exception {
244    Configuration schedConf = HBaseConfiguration.create();
245    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
246
247    PriorityFunction priority = mock(PriorityFunction.class);
248    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
249
250    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
251                                                    HConstants.QOS_THRESHOLD);
252    try {
253      scheduler.start();
254
255      CallRunner smallCallTask = mock(CallRunner.class);
256      ServerCall smallCall = mock(ServerCall.class);
257      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
258      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
259      when(smallCall.getHeader()).thenReturn(smallHead);
260
261      CallRunner largeCallTask = mock(CallRunner.class);
262      ServerCall largeCall = mock(ServerCall.class);
263      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
264      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
265      when(largeCall.getHeader()).thenReturn(largeHead);
266
267      CallRunner hugeCallTask = mock(CallRunner.class);
268      ServerCall hugeCall = mock(ServerCall.class);
269      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
270      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
271      when(hugeCall.getHeader()).thenReturn(hugeHead);
272
273      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
274      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
275      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
276
277      final ArrayList<Integer> work = new ArrayList<>();
278      doAnswerTaskExecution(smallCallTask, work, 10, 250);
279      doAnswerTaskExecution(largeCallTask, work, 50, 250);
280      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
281
282      scheduler.dispatch(smallCallTask);
283      scheduler.dispatch(smallCallTask);
284      scheduler.dispatch(smallCallTask);
285      scheduler.dispatch(hugeCallTask);
286      scheduler.dispatch(smallCallTask);
287      scheduler.dispatch(largeCallTask);
288      scheduler.dispatch(smallCallTask);
289      scheduler.dispatch(smallCallTask);
290
291      while (work.size() < 8) {
292        Thread.sleep(100);
293      }
294
295      int seqSum = 0;
296      int totalTime = 0;
297      for (int i = 0; i < work.size(); ++i) {
298        LOG.debug("Request i=" + i + " value=" + work.get(i));
299        seqSum += work.get(i);
300        totalTime += seqSum;
301      }
302      LOG.debug("Total Time: " + totalTime);
303
304      // -> [small small small huge small large small small]
305      // -> NO REORDER   [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
306      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
307      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
308        assertEquals(530, totalTime);
309      } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
310        assertEquals(930, totalTime);
311      }
312    } finally {
313      scheduler.stop();
314    }
315  }
316
317  @Test
318  public void testScanQueueWithZeroScanRatio() throws Exception {
319
320    Configuration schedConf = HBaseConfiguration.create();
321    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
322    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
323    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
324
325    PriorityFunction priority = mock(PriorityFunction.class);
326    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
327
328    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
329                                                    HConstants.QOS_THRESHOLD);
330    assertNotEquals(null, scheduler);
331  }
332
333  @Test
334  public void testScanQueues() throws Exception {
335    Configuration schedConf = HBaseConfiguration.create();
336    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
337    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
338    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
339
340    PriorityFunction priority = mock(PriorityFunction.class);
341    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
342
343    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
344                                                    HConstants.QOS_THRESHOLD);
345    try {
346      scheduler.start();
347
348      CallRunner putCallTask = mock(CallRunner.class);
349      ServerCall putCall = mock(ServerCall.class);
350      putCall.param = RequestConverter.buildMutateRequest(
351          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
352      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
353      when(putCallTask.getRpcCall()).thenReturn(putCall);
354      when(putCall.getHeader()).thenReturn(putHead);
355      when(putCall.getParam()).thenReturn(putCall.param);
356
357      CallRunner getCallTask = mock(CallRunner.class);
358      ServerCall getCall = mock(ServerCall.class);
359      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
360      when(getCallTask.getRpcCall()).thenReturn(getCall);
361      when(getCall.getHeader()).thenReturn(getHead);
362
363      CallRunner scanCallTask = mock(CallRunner.class);
364      ServerCall scanCall = mock(ServerCall.class);
365      scanCall.param = ScanRequest.newBuilder().build();
366      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
367      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
368      when(scanCall.getHeader()).thenReturn(scanHead);
369      when(scanCall.getParam()).thenReturn(scanCall.param);
370
371      ArrayList<Integer> work = new ArrayList<>();
372      doAnswerTaskExecution(putCallTask, work, 1, 1000);
373      doAnswerTaskExecution(getCallTask, work, 2, 1000);
374      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
375
376      // There are 3 queues: [puts], [gets], [scans]
377      // so the calls will be interleaved
378      scheduler.dispatch(putCallTask);
379      scheduler.dispatch(putCallTask);
380      scheduler.dispatch(putCallTask);
381      scheduler.dispatch(getCallTask);
382      scheduler.dispatch(getCallTask);
383      scheduler.dispatch(getCallTask);
384      scheduler.dispatch(scanCallTask);
385      scheduler.dispatch(scanCallTask);
386      scheduler.dispatch(scanCallTask);
387
388      while (work.size() < 6) {
389        Thread.sleep(100);
390      }
391
392      for (int i = 0; i < work.size() - 2; i += 3) {
393        assertNotEquals(work.get(i + 0), work.get(i + 1));
394        assertNotEquals(work.get(i + 0), work.get(i + 2));
395        assertNotEquals(work.get(i + 1), work.get(i + 2));
396      }
397    } finally {
398      scheduler.stop();
399    }
400  }
401
402  private void doAnswerTaskExecution(final CallRunner callTask,
403      final ArrayList<Integer> results, final int value, final int sleepInterval) {
404    callTask.setStatus(new MonitoredRPCHandlerImpl());
405    doAnswer(new Answer<Object>() {
406      @Override
407      public Object answer(InvocationOnMock invocation) {
408        synchronized (results) {
409          results.add(value);
410        }
411        Threads.sleepWithoutInterrupt(sleepInterval);
412        return null;
413      }
414    }).when(callTask).run();
415  }
416
417  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
418      throws InterruptedException {
419    while (scheduler.getGeneralQueueLength() > 0) {
420      Thread.sleep(100);
421    }
422  }
423
424  @Test
425  public void testSoftAndHardQueueLimits() throws Exception {
426
427    Configuration schedConf = HBaseConfiguration.create();
428
429    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
430    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
431    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
432      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
433
434    PriorityFunction priority = mock(PriorityFunction.class);
435    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
436    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
437      HConstants.QOS_THRESHOLD);
438    try {
439      scheduler.start();
440
441      CallRunner putCallTask = mock(CallRunner.class);
442      ServerCall putCall = mock(ServerCall.class);
443      putCall.param = RequestConverter.buildMutateRequest(
444        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
445      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
446      when(putCallTask.getRpcCall()).thenReturn(putCall);
447      when(putCall.getHeader()).thenReturn(putHead);
448
449      assertTrue(scheduler.dispatch(putCallTask));
450
451      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
452      scheduler.onConfigurationChange(schedConf);
453      assertFalse(scheduler.dispatch(putCallTask));
454      waitUntilQueueEmpty(scheduler);
455      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
456      scheduler.onConfigurationChange(schedConf);
457      assertTrue(scheduler.dispatch(putCallTask));
458    } finally {
459      scheduler.stop();
460    }
461  }
462
463  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
464
465    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
466
467    private long offset;
468
469    private final Set<String> threadNamePrefixs = new HashSet<>();
470
471    @Override
472    public long currentTime() {
473      for (String threadNamePrefix : threadNamePrefixs) {
474        String threadName = Thread.currentThread().getName();
475        if (threadName.startsWith(threadNamePrefix)) {
476          return timeQ.poll().longValue() + offset;
477        }
478      }
479      return System.currentTimeMillis();
480    }
481  }
482
483  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay from the get go.
484  // So we are always overloaded. The test below would seem to complete the queuing of all the CallRunners inside
485  // the codel check interval. I don't think we are skipping codel checking. Second, I think this test has been
486  // broken since HBASE-16089 Add on FastPath for CoDel went in. The thread name we were looking for was the name
487  // BEFORE we updated: i.e. "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel
488  // fastpath thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues...
489  // Codel is hard to test. This test is going to be flakey given it all timer-based. Disabling for now till chat
490  // with authors.
491  @Test
492  public void testCoDelScheduling() throws Exception {
493    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
494    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
495    Configuration schedConf = HBaseConfiguration.create();
496    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
497    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
498      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
499    PriorityFunction priority = mock(PriorityFunction.class);
500    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
501    SimpleRpcScheduler scheduler =
502        new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
503    try {
504      // Loading mocked call runner can take a good amount of time the first time through (haven't looked why).
505      // Load it for first time here outside of the timed loop.
506      getMockedCallRunner(System.currentTimeMillis(), 2);
507      scheduler.start();
508      EnvironmentEdgeManager.injectEdge(envEdge);
509      envEdge.offset = 5;
510      // Calls faster than min delay
511      // LOG.info("Start");
512      for (int i = 0; i < 100; i++) {
513        long time = System.currentTimeMillis();
514        envEdge.timeQ.put(time);
515        CallRunner cr = getMockedCallRunner(time, 2);
516        // LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr);
517        scheduler.dispatch(cr);
518      }
519      // LOG.info("Loop done");
520      // make sure fast calls are handled
521      waitUntilQueueEmpty(scheduler);
522      Thread.sleep(100);
523      assertEquals("None of these calls should have been discarded", 0,
524        scheduler.getNumGeneralCallsDropped());
525
526      envEdge.offset = 151;
527      // calls slower than min delay, but not individually slow enough to be dropped
528      for (int i = 0; i < 20; i++) {
529        long time = System.currentTimeMillis();
530        envEdge.timeQ.put(time);
531        CallRunner cr = getMockedCallRunner(time, 2);
532        scheduler.dispatch(cr);
533      }
534
535      // make sure somewhat slow calls are handled
536      waitUntilQueueEmpty(scheduler);
537      Thread.sleep(100);
538      assertEquals("None of these calls should have been discarded", 0,
539        scheduler.getNumGeneralCallsDropped());
540
541      envEdge.offset = 2000;
542      // now slow calls and the ones to be dropped
543      for (int i = 0; i < 60; i++) {
544        long time = System.currentTimeMillis();
545        envEdge.timeQ.put(time);
546        CallRunner cr = getMockedCallRunner(time, 100);
547        scheduler.dispatch(cr);
548      }
549
550      // make sure somewhat slow calls are handled
551      waitUntilQueueEmpty(scheduler);
552      Thread.sleep(100);
553      assertTrue(
554          "There should have been at least 12 calls dropped however there were "
555              + scheduler.getNumGeneralCallsDropped(),
556          scheduler.getNumGeneralCallsDropped() > 12);
557    } finally {
558      scheduler.stop();
559    }
560  }
561
562  // Get mocked call that has the CallRunner sleep for a while so that the fast
563  // path isn't hit.
564  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
565    ServerCall putCall = new ServerCall(1, null, null,
566        RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
567        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
568        null, null, 9, null, timestamp, 0, null, null, null) {
569
570      @Override
571      public void sendResponseIfReady() throws IOException {
572      }
573    };
574
575    CallRunner cr = new CallRunner(null, putCall) {
576      @Override
577      public void run() {
578        if (sleepTime <= 0) return;
579        try {
580          LOG.warn("Sleeping for " + sleepTime);
581          Thread.sleep(sleepTime);
582          LOG.warn("Done Sleeping for " + sleepTime);
583        } catch (InterruptedException e) {
584        }
585      }
586
587      @Override
588      public RpcCall getRpcCall() {
589        return putCall;
590      }
591
592      @Override
593      public void drop() {
594      }
595    };
596
597    return cr;
598  }
599}