001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.eq;
026import static org.mockito.Mockito.doAnswer;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.timeout;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031import java.io.IOException;
032import java.lang.reflect.Field;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.HashSet;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.CountDownLatch;
041import java.util.concurrent.LinkedBlockingQueue;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.testclassification.RPCTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdge;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.Threads;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.mockito.invocation.InvocationOnMock;
059import org.mockito.stubbing.Answer;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
065import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
066import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
070
071@Category({RPCTests.class, MediumTests.class})
072public class TestSimpleRpcScheduler {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
079
080  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
081    @Override
082    public InetSocketAddress getListenerAddress() {
083      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
084    }
085  };
086  private Configuration conf;
087
088  @Before
089  public void setUp() {
090    conf = HBaseConfiguration.create();
091  }
092
093  @Test
094  public void testBasic() throws IOException, InterruptedException {
095
096    PriorityFunction qosFunction = mock(PriorityFunction.class);
097    RpcScheduler scheduler = new SimpleRpcScheduler(
098        conf, 10, 0, 0, qosFunction, 0);
099    scheduler.init(CONTEXT);
100    scheduler.start();
101    CallRunner task = createMockTask();
102    task.setStatus(new MonitoredRPCHandlerImpl());
103    scheduler.dispatch(task);
104    verify(task, timeout(10000)).run();
105    scheduler.stop();
106  }
107
108  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
109    try {
110      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
111      ExecutorField.setAccessible(true);
112
113      RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
114
115      Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().
116        getDeclaredField("handlerCount");
117
118      handlerCountField.setAccessible(true);
119      handlerCountField.set(rpcExecutor, 0);
120
121      Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().
122        getDeclaredField("numCallQueues");
123
124      numCallQueuesField.setAccessible(true);
125      numCallQueuesField.set(rpcExecutor, 1);
126
127      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().
128        getDeclaredField("currentQueueLimit");
129
130      currentQueueLimitField.setAccessible(true);
131      currentQueueLimitField.set(rpcExecutor, 100);
132
133    } catch (NoSuchFieldException e) {
134      LOG.error("No such field exception"+e);
135    } catch (IllegalAccessException e) {
136      LOG.error("Illegal access exception"+e);
137    }
138
139    return scheduler;
140  }
141
142  @Test
143  public void testCallQueueInfo() throws IOException, InterruptedException {
144
145    PriorityFunction qosFunction = mock(PriorityFunction.class);
146    RpcScheduler scheduler = new SimpleRpcScheduler(
147            conf, 0, 0, 0, qosFunction, 0);
148
149    scheduler.init(CONTEXT);
150
151    // Set the handlers to zero. So that number of requests in call Queue can be tested
152    scheduler = disableHandlers(scheduler);
153    scheduler.start();
154
155    int totalCallMethods = 10;
156    for (int i = totalCallMethods; i>0; i--) {
157      CallRunner task = createMockTask();
158      task.setStatus(new MonitoredRPCHandlerImpl());
159      scheduler.dispatch(task);
160    }
161
162
163    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
164
165    for (String callQueueName:callQueueInfo.getCallQueueNames()) {
166
167      for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
168        assertEquals(totalCallMethods,
169            callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
170      }
171
172    }
173
174    scheduler.stop();
175
176  }
177
178
179  @Test
180  public void testHandlerIsolation() throws IOException, InterruptedException {
181    CallRunner generalTask = createMockTask();
182    CallRunner priorityTask = createMockTask();
183    CallRunner replicationTask = createMockTask();
184    List<CallRunner> tasks = ImmutableList.of(
185        generalTask,
186        priorityTask,
187        replicationTask);
188    Map<CallRunner, Integer> qos = ImmutableMap.of(
189        generalTask, 0,
190        priorityTask, HConstants.HIGH_QOS + 1,
191        replicationTask, HConstants.REPLICATION_QOS);
192    PriorityFunction qosFunction = mock(PriorityFunction.class);
193    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
194    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
195    Answer<Void> answerToRun = new Answer<Void>() {
196      @Override
197      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
198        synchronized (handlerThreads) {
199          handlerThreads.put(
200              (CallRunner) invocationOnMock.getMock(),
201              Thread.currentThread());
202        }
203        countDownLatch.countDown();
204        return null;
205      }
206    };
207    for (CallRunner task : tasks) {
208      task.setStatus(new MonitoredRPCHandlerImpl());
209      doAnswer(answerToRun).when(task).run();
210    }
211
212    RpcScheduler scheduler = new SimpleRpcScheduler(
213        conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
214    scheduler.init(CONTEXT);
215    scheduler.start();
216    for (CallRunner task : tasks) {
217      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
218      scheduler.dispatch(task);
219    }
220    for (CallRunner task : tasks) {
221      verify(task, timeout(10000)).run();
222    }
223    scheduler.stop();
224
225    // Tests that these requests are handled by three distinct threads.
226    countDownLatch.await();
227    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
228  }
229
230  private CallRunner createMockTask() {
231    ServerCall call = mock(ServerCall.class);
232    CallRunner task = mock(CallRunner.class);
233    when(task.getRpcCall()).thenReturn(call);
234    return task;
235  }
236
237  @Test
238  public void testRpcScheduler() throws Exception {
239    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
240    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
241  }
242
243  private void testRpcScheduler(final String queueType) throws Exception {
244    Configuration schedConf = HBaseConfiguration.create();
245    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
246
247    PriorityFunction priority = mock(PriorityFunction.class);
248    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
249
250    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
251                                                    HConstants.QOS_THRESHOLD);
252    try {
253      scheduler.start();
254
255      CallRunner smallCallTask = mock(CallRunner.class);
256      ServerCall smallCall = mock(ServerCall.class);
257      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
258      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
259      when(smallCall.getHeader()).thenReturn(smallHead);
260
261      CallRunner largeCallTask = mock(CallRunner.class);
262      ServerCall largeCall = mock(ServerCall.class);
263      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
264      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
265      when(largeCall.getHeader()).thenReturn(largeHead);
266
267      CallRunner hugeCallTask = mock(CallRunner.class);
268      ServerCall hugeCall = mock(ServerCall.class);
269      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
270      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
271      when(hugeCall.getHeader()).thenReturn(hugeHead);
272
273      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
274      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
275      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
276
277      final ArrayList<Integer> work = new ArrayList<>();
278      doAnswerTaskExecution(smallCallTask, work, 10, 250);
279      doAnswerTaskExecution(largeCallTask, work, 50, 250);
280      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
281
282      scheduler.dispatch(smallCallTask);
283      scheduler.dispatch(smallCallTask);
284      scheduler.dispatch(smallCallTask);
285      scheduler.dispatch(hugeCallTask);
286      scheduler.dispatch(smallCallTask);
287      scheduler.dispatch(largeCallTask);
288      scheduler.dispatch(smallCallTask);
289      scheduler.dispatch(smallCallTask);
290
291      while (work.size() < 8) {
292        Thread.sleep(100);
293      }
294
295      int seqSum = 0;
296      int totalTime = 0;
297      for (int i = 0; i < work.size(); ++i) {
298        LOG.debug("Request i=" + i + " value=" + work.get(i));
299        seqSum += work.get(i);
300        totalTime += seqSum;
301      }
302      LOG.debug("Total Time: " + totalTime);
303
304      // -> [small small small huge small large small small]
305      // -> NO REORDER   [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
306      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
307      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
308        assertEquals(530, totalTime);
309      } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
310        assertEquals(930, totalTime);
311      }
312    } finally {
313      scheduler.stop();
314    }
315  }
316
317  @Test
318  public void testScanQueueWithZeroScanRatio() throws Exception {
319
320    Configuration schedConf = HBaseConfiguration.create();
321    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
322    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
323    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
324
325    PriorityFunction priority = mock(PriorityFunction.class);
326    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
327
328    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
329                                                    HConstants.QOS_THRESHOLD);
330    assertNotEquals(null, scheduler);
331  }
332
333  @Test
334  public void testScanQueues() throws Exception {
335    Configuration schedConf = HBaseConfiguration.create();
336    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
337    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
338    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
339
340    PriorityFunction priority = mock(PriorityFunction.class);
341    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
342
343    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
344                                                    HConstants.QOS_THRESHOLD);
345    try {
346      scheduler.start();
347
348      CallRunner putCallTask = mock(CallRunner.class);
349      ServerCall putCall = mock(ServerCall.class);
350      putCall.param = RequestConverter.buildMutateRequest(
351          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
352      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
353      when(putCallTask.getRpcCall()).thenReturn(putCall);
354      when(putCall.getHeader()).thenReturn(putHead);
355      when(putCall.getParam()).thenReturn(putCall.param);
356
357      CallRunner getCallTask = mock(CallRunner.class);
358      ServerCall getCall = mock(ServerCall.class);
359      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
360      when(getCallTask.getRpcCall()).thenReturn(getCall);
361      when(getCall.getHeader()).thenReturn(getHead);
362
363      CallRunner scanCallTask = mock(CallRunner.class);
364      ServerCall scanCall = mock(ServerCall.class);
365      scanCall.param = ScanRequest.newBuilder().build();
366      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
367      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
368      when(scanCall.getHeader()).thenReturn(scanHead);
369      when(scanCall.getParam()).thenReturn(scanCall.param);
370
371      ArrayList<Integer> work = new ArrayList<>();
372      doAnswerTaskExecution(putCallTask, work, 1, 1000);
373      doAnswerTaskExecution(getCallTask, work, 2, 1000);
374      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
375
376      // There are 3 queues: [puts], [gets], [scans]
377      // so the calls will be interleaved
378      scheduler.dispatch(putCallTask);
379      scheduler.dispatch(putCallTask);
380      scheduler.dispatch(putCallTask);
381      scheduler.dispatch(getCallTask);
382      scheduler.dispatch(getCallTask);
383      scheduler.dispatch(getCallTask);
384      scheduler.dispatch(scanCallTask);
385      scheduler.dispatch(scanCallTask);
386      scheduler.dispatch(scanCallTask);
387
388      while (work.size() < 6) {
389        Thread.sleep(100);
390      }
391
392      for (int i = 0; i < work.size() - 2; i += 3) {
393        assertNotEquals(work.get(i + 0), work.get(i + 1));
394        assertNotEquals(work.get(i + 0), work.get(i + 2));
395        assertNotEquals(work.get(i + 1), work.get(i + 2));
396      }
397    } finally {
398      scheduler.stop();
399    }
400  }
401
402  private void doAnswerTaskExecution(final CallRunner callTask,
403      final ArrayList<Integer> results, final int value, final int sleepInterval) {
404    callTask.setStatus(new MonitoredRPCHandlerImpl());
405    doAnswer(new Answer<Object>() {
406      @Override
407      public Object answer(InvocationOnMock invocation) {
408        synchronized (results) {
409          results.add(value);
410        }
411        Threads.sleepWithoutInterrupt(sleepInterval);
412        return null;
413      }
414    }).when(callTask).run();
415  }
416
417  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
418      throws InterruptedException {
419    while (scheduler.getGeneralQueueLength() > 0) {
420      Thread.sleep(100);
421    }
422  }
423
424  @Test
425  public void testSoftAndHardQueueLimits() throws Exception {
426
427    Configuration schedConf = HBaseConfiguration.create();
428
429    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
430    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
431    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
432      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
433
434    PriorityFunction priority = mock(PriorityFunction.class);
435    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
436    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
437      HConstants.QOS_THRESHOLD);
438    try {
439      scheduler.start();
440
441      CallRunner putCallTask = mock(CallRunner.class);
442      ServerCall putCall = mock(ServerCall.class);
443      putCall.param = RequestConverter.buildMutateRequest(
444        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
445      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
446      when(putCallTask.getRpcCall()).thenReturn(putCall);
447      when(putCall.getHeader()).thenReturn(putHead);
448
449      assertTrue(scheduler.dispatch(putCallTask));
450
451      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
452      scheduler.onConfigurationChange(schedConf);
453      assertFalse(scheduler.dispatch(putCallTask));
454      waitUntilQueueEmpty(scheduler);
455      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
456      scheduler.onConfigurationChange(schedConf);
457      assertTrue(scheduler.dispatch(putCallTask));
458    } finally {
459      scheduler.stop();
460    }
461  }
462
463  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
464
465    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
466
467    private long offset;
468
469    private final Set<String> threadNamePrefixs = new HashSet<>();
470
471    @Override
472    public long currentTime() {
473      for (String threadNamePrefix : threadNamePrefixs) {
474        String threadName = Thread.currentThread().getName();
475        if (threadName.startsWith(threadNamePrefix)) {
476          return timeQ.poll().longValue() + offset;
477        }
478      }
479      return System.currentTimeMillis();
480    }
481  }
482
483  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
484  // from the get go. So we are always overloaded. The test below would seem to complete the
485  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
486  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
487  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
488  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
489  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
490  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
491  // timer-based. Disabling for now till chat with authors.
492  @Test
493  public void testCoDelScheduling() throws Exception {
494    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
495    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
496    Configuration schedConf = HBaseConfiguration.create();
497    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
498    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
499      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
500    PriorityFunction priority = mock(PriorityFunction.class);
501    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
502    SimpleRpcScheduler scheduler =
503        new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
504    try {
505      // Loading mocked call runner can take a good amount of time the first time through
506      // (haven't looked why). Load it for first time here outside of the timed loop.
507      getMockedCallRunner(System.currentTimeMillis(), 2);
508      scheduler.start();
509      EnvironmentEdgeManager.injectEdge(envEdge);
510      envEdge.offset = 5;
511      // Calls faster than min delay
512      // LOG.info("Start");
513      for (int i = 0; i < 100; i++) {
514        long time = System.currentTimeMillis();
515        envEdge.timeQ.put(time);
516        CallRunner cr = getMockedCallRunner(time, 2);
517        // LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr);
518        scheduler.dispatch(cr);
519      }
520      // LOG.info("Loop done");
521      // make sure fast calls are handled
522      waitUntilQueueEmpty(scheduler);
523      Thread.sleep(100);
524      assertEquals("None of these calls should have been discarded", 0,
525        scheduler.getNumGeneralCallsDropped());
526
527      envEdge.offset = 151;
528      // calls slower than min delay, but not individually slow enough to be dropped
529      for (int i = 0; i < 20; i++) {
530        long time = System.currentTimeMillis();
531        envEdge.timeQ.put(time);
532        CallRunner cr = getMockedCallRunner(time, 2);
533        scheduler.dispatch(cr);
534      }
535
536      // make sure somewhat slow calls are handled
537      waitUntilQueueEmpty(scheduler);
538      Thread.sleep(100);
539      assertEquals("None of these calls should have been discarded", 0,
540        scheduler.getNumGeneralCallsDropped());
541
542      envEdge.offset = 2000;
543      // now slow calls and the ones to be dropped
544      for (int i = 0; i < 60; i++) {
545        long time = System.currentTimeMillis();
546        envEdge.timeQ.put(time);
547        CallRunner cr = getMockedCallRunner(time, 100);
548        scheduler.dispatch(cr);
549      }
550
551      // make sure somewhat slow calls are handled
552      waitUntilQueueEmpty(scheduler);
553      Thread.sleep(100);
554      assertTrue(
555          "There should have been at least 12 calls dropped however there were "
556              + scheduler.getNumGeneralCallsDropped(),
557          scheduler.getNumGeneralCallsDropped() > 12);
558    } finally {
559      scheduler.stop();
560    }
561  }
562
563  @Test
564  public void testMetaRWScanQueues() throws Exception {
565    Configuration schedConf = HBaseConfiguration.create();
566    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
567    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
568    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
569
570    PriorityFunction priority = mock(PriorityFunction.class);
571    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
572
573    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority,
574        HConstants.QOS_THRESHOLD);
575    try {
576      scheduler.start();
577
578      CallRunner putCallTask = mock(CallRunner.class);
579      ServerCall putCall = mock(ServerCall.class);
580      putCall.param = RequestConverter.buildMutateRequest(
581          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
582      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
583      when(putCallTask.getRpcCall()).thenReturn(putCall);
584      when(putCall.getHeader()).thenReturn(putHead);
585      when(putCall.getParam()).thenReturn(putCall.param);
586
587      CallRunner getCallTask = mock(CallRunner.class);
588      ServerCall getCall = mock(ServerCall.class);
589      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
590      when(getCallTask.getRpcCall()).thenReturn(getCall);
591      when(getCall.getHeader()).thenReturn(getHead);
592
593      CallRunner scanCallTask = mock(CallRunner.class);
594      ServerCall scanCall = mock(ServerCall.class);
595      scanCall.param = ScanRequest.newBuilder().build();
596      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
597      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
598      when(scanCall.getHeader()).thenReturn(scanHead);
599      when(scanCall.getParam()).thenReturn(scanCall.param);
600
601      ArrayList<Integer> work = new ArrayList<>();
602      doAnswerTaskExecution(putCallTask, work, 1, 1000);
603      doAnswerTaskExecution(getCallTask, work, 2, 1000);
604      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
605
606      // There are 3 queues: [puts], [gets], [scans]
607      // so the calls will be interleaved
608      scheduler.dispatch(putCallTask);
609      scheduler.dispatch(putCallTask);
610      scheduler.dispatch(putCallTask);
611      scheduler.dispatch(getCallTask);
612      scheduler.dispatch(getCallTask);
613      scheduler.dispatch(getCallTask);
614      scheduler.dispatch(scanCallTask);
615      scheduler.dispatch(scanCallTask);
616      scheduler.dispatch(scanCallTask);
617
618      while (work.size() < 6) {
619        Thread.sleep(100);
620      }
621
622      for (int i = 0; i < work.size() - 2; i += 3) {
623        assertNotEquals(work.get(i + 0), work.get(i + 1));
624        assertNotEquals(work.get(i + 0), work.get(i + 2));
625        assertNotEquals(work.get(i + 1), work.get(i + 2));
626      }
627    } finally {
628      scheduler.stop();
629    }
630  }
631
632  // Get mocked call that has the CallRunner sleep for a while so that the fast
633  // path isn't hit.
634  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
635    ServerCall putCall = new ServerCall(1, null, null,
636        RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
637        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
638        null, null, 9, null, timestamp, 0, null, null, null) {
639
640      @Override
641      public void sendResponseIfReady() throws IOException {
642      }
643    };
644
645    CallRunner cr = new CallRunner(null, putCall) {
646      @Override
647      public void run() {
648        if (sleepTime <= 0) {
649          return;
650        }
651        try {
652          LOG.warn("Sleeping for " + sleepTime);
653          Thread.sleep(sleepTime);
654          LOG.warn("Done Sleeping for " + sleepTime);
655        } catch (InterruptedException e) {
656        }
657      }
658
659      @Override
660      public RpcCall getRpcCall() {
661        return putCall;
662      }
663
664      @Override
665      public void drop() {
666      }
667    };
668
669    return cr;
670  }
671}