001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.eq;
026import static org.mockito.Mockito.doAnswer;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.timeout;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031import java.io.IOException;
032import java.lang.reflect.Field;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.HashSet;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.CountDownLatch;
041import java.util.concurrent.LinkedBlockingQueue;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.Abortable;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.testclassification.RPCTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.EnvironmentEdge;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.Threads;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.mockito.Mockito;
062import org.mockito.invocation.InvocationOnMock;
063import org.mockito.stubbing.Answer;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
069import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
074
075@Category({RPCTests.class, MediumTests.class})
076public class TestSimpleRpcScheduler {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
081
082  @Rule
083  public TestName testName = new TestName();
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
086
087  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
088    @Override
089    public InetSocketAddress getListenerAddress() {
090      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
091    }
092  };
093  private Configuration conf;
094
095  @Before
096  public void setUp() {
097    conf = HBaseConfiguration.create();
098  }
099
100  @Test
101  public void testBasic() throws IOException, InterruptedException {
102
103    PriorityFunction qosFunction = mock(PriorityFunction.class);
104    RpcScheduler scheduler = new SimpleRpcScheduler(
105        conf, 10, 0, 0, qosFunction, 0);
106    scheduler.init(CONTEXT);
107    scheduler.start();
108    CallRunner task = createMockTask();
109    task.setStatus(new MonitoredRPCHandlerImpl());
110    scheduler.dispatch(task);
111    verify(task, timeout(10000)).run();
112    scheduler.stop();
113  }
114
115  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
116    try {
117      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
118      ExecutorField.setAccessible(true);
119
120      RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
121
122      Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().
123        getDeclaredField("handlerCount");
124
125      handlerCountField.setAccessible(true);
126      handlerCountField.set(rpcExecutor, 0);
127
128      Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().
129        getDeclaredField("numCallQueues");
130
131      numCallQueuesField.setAccessible(true);
132      numCallQueuesField.set(rpcExecutor, 1);
133
134      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().
135        getDeclaredField("currentQueueLimit");
136
137      currentQueueLimitField.setAccessible(true);
138      currentQueueLimitField.set(rpcExecutor, 100);
139
140    } catch (NoSuchFieldException e) {
141      LOG.error("No such field exception"+e);
142    } catch (IllegalAccessException e) {
143      LOG.error("Illegal access exception"+e);
144    }
145
146    return scheduler;
147  }
148
149  @Test
150  public void testCallQueueInfo() throws IOException, InterruptedException {
151
152    PriorityFunction qosFunction = mock(PriorityFunction.class);
153    RpcScheduler scheduler = new SimpleRpcScheduler(
154            conf, 0, 0, 0, qosFunction, 0);
155
156    scheduler.init(CONTEXT);
157
158    // Set the handlers to zero. So that number of requests in call Queue can be tested
159    scheduler = disableHandlers(scheduler);
160    scheduler.start();
161
162    int totalCallMethods = 10;
163    for (int i = totalCallMethods; i>0; i--) {
164      CallRunner task = createMockTask();
165      task.setStatus(new MonitoredRPCHandlerImpl());
166      scheduler.dispatch(task);
167    }
168
169
170    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
171
172    for (String callQueueName:callQueueInfo.getCallQueueNames()) {
173
174      for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
175        assertEquals(totalCallMethods,
176            callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
177      }
178
179    }
180
181    scheduler.stop();
182
183  }
184
185
186  @Test
187  public void testHandlerIsolation() throws IOException, InterruptedException {
188    CallRunner generalTask = createMockTask();
189    CallRunner priorityTask = createMockTask();
190    CallRunner replicationTask = createMockTask();
191    List<CallRunner> tasks = ImmutableList.of(
192        generalTask,
193        priorityTask,
194        replicationTask);
195    Map<CallRunner, Integer> qos = ImmutableMap.of(
196        generalTask, 0,
197        priorityTask, HConstants.HIGH_QOS + 1,
198        replicationTask, HConstants.REPLICATION_QOS);
199    PriorityFunction qosFunction = mock(PriorityFunction.class);
200    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
201    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
202    Answer<Void> answerToRun = new Answer<Void>() {
203      @Override
204      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
205        synchronized (handlerThreads) {
206          handlerThreads.put(
207              (CallRunner) invocationOnMock.getMock(),
208              Thread.currentThread());
209        }
210        countDownLatch.countDown();
211        return null;
212      }
213    };
214    for (CallRunner task : tasks) {
215      task.setStatus(new MonitoredRPCHandlerImpl());
216      doAnswer(answerToRun).when(task).run();
217    }
218
219    RpcScheduler scheduler = new SimpleRpcScheduler(
220        conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
221    scheduler.init(CONTEXT);
222    scheduler.start();
223    for (CallRunner task : tasks) {
224      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
225      scheduler.dispatch(task);
226    }
227    for (CallRunner task : tasks) {
228      verify(task, timeout(10000)).run();
229    }
230    scheduler.stop();
231
232    // Tests that these requests are handled by three distinct threads.
233    countDownLatch.await();
234    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
235  }
236
237  private CallRunner createMockTask() {
238    ServerCall call = mock(ServerCall.class);
239    CallRunner task = mock(CallRunner.class);
240    when(task.getRpcCall()).thenReturn(call);
241    return task;
242  }
243
244  @Test
245  public void testRpcScheduler() throws Exception {
246    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
247    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
248  }
249
250  private void testRpcScheduler(final String queueType) throws Exception {
251    Configuration schedConf = HBaseConfiguration.create();
252    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
253
254    PriorityFunction priority = mock(PriorityFunction.class);
255    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
256
257    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
258                                                    HConstants.QOS_THRESHOLD);
259    try {
260      scheduler.start();
261
262      CallRunner smallCallTask = mock(CallRunner.class);
263      ServerCall smallCall = mock(ServerCall.class);
264      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
265      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
266      when(smallCall.getHeader()).thenReturn(smallHead);
267
268      CallRunner largeCallTask = mock(CallRunner.class);
269      ServerCall largeCall = mock(ServerCall.class);
270      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
271      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
272      when(largeCall.getHeader()).thenReturn(largeHead);
273
274      CallRunner hugeCallTask = mock(CallRunner.class);
275      ServerCall hugeCall = mock(ServerCall.class);
276      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
277      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
278      when(hugeCall.getHeader()).thenReturn(hugeHead);
279
280      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
281      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
282      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
283
284      final ArrayList<Integer> work = new ArrayList<>();
285      doAnswerTaskExecution(smallCallTask, work, 10, 250);
286      doAnswerTaskExecution(largeCallTask, work, 50, 250);
287      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
288
289      scheduler.dispatch(smallCallTask);
290      scheduler.dispatch(smallCallTask);
291      scheduler.dispatch(smallCallTask);
292      scheduler.dispatch(hugeCallTask);
293      scheduler.dispatch(smallCallTask);
294      scheduler.dispatch(largeCallTask);
295      scheduler.dispatch(smallCallTask);
296      scheduler.dispatch(smallCallTask);
297
298      while (work.size() < 8) {
299        Thread.sleep(100);
300      }
301
302      int seqSum = 0;
303      int totalTime = 0;
304      for (int i = 0; i < work.size(); ++i) {
305        LOG.debug("Request i=" + i + " value=" + work.get(i));
306        seqSum += work.get(i);
307        totalTime += seqSum;
308      }
309      LOG.debug("Total Time: " + totalTime);
310
311      // -> [small small small huge small large small small]
312      // -> NO REORDER   [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
313      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
314      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
315        assertEquals(530, totalTime);
316      } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
317        assertEquals(930, totalTime);
318      }
319    } finally {
320      scheduler.stop();
321    }
322  }
323
324  @Test
325  public void testScanQueueWithZeroScanRatio() throws Exception {
326
327    Configuration schedConf = HBaseConfiguration.create();
328    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
329    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
330    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
331
332    PriorityFunction priority = mock(PriorityFunction.class);
333    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
334
335    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
336                                                    HConstants.QOS_THRESHOLD);
337    assertNotEquals(null, scheduler);
338  }
339
340  @Test
341  public void testScanQueues() throws Exception {
342    Configuration schedConf = HBaseConfiguration.create();
343    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
344    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
345    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
346
347    PriorityFunction priority = mock(PriorityFunction.class);
348    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
349
350    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
351                                                    HConstants.QOS_THRESHOLD);
352    try {
353      scheduler.start();
354
355      CallRunner putCallTask = mock(CallRunner.class);
356      ServerCall putCall = mock(ServerCall.class);
357      putCall.param = RequestConverter.buildMutateRequest(
358          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
359      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
360      when(putCallTask.getRpcCall()).thenReturn(putCall);
361      when(putCall.getHeader()).thenReturn(putHead);
362      when(putCall.getParam()).thenReturn(putCall.param);
363
364      CallRunner getCallTask = mock(CallRunner.class);
365      ServerCall getCall = mock(ServerCall.class);
366      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
367      when(getCallTask.getRpcCall()).thenReturn(getCall);
368      when(getCall.getHeader()).thenReturn(getHead);
369
370      CallRunner scanCallTask = mock(CallRunner.class);
371      ServerCall scanCall = mock(ServerCall.class);
372      scanCall.param = ScanRequest.newBuilder().build();
373      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
374      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
375      when(scanCall.getHeader()).thenReturn(scanHead);
376      when(scanCall.getParam()).thenReturn(scanCall.param);
377
378      ArrayList<Integer> work = new ArrayList<>();
379      doAnswerTaskExecution(putCallTask, work, 1, 1000);
380      doAnswerTaskExecution(getCallTask, work, 2, 1000);
381      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
382
383      // There are 3 queues: [puts], [gets], [scans]
384      // so the calls will be interleaved
385      scheduler.dispatch(putCallTask);
386      scheduler.dispatch(putCallTask);
387      scheduler.dispatch(putCallTask);
388      scheduler.dispatch(getCallTask);
389      scheduler.dispatch(getCallTask);
390      scheduler.dispatch(getCallTask);
391      scheduler.dispatch(scanCallTask);
392      scheduler.dispatch(scanCallTask);
393      scheduler.dispatch(scanCallTask);
394
395      while (work.size() < 6) {
396        Thread.sleep(100);
397      }
398
399      for (int i = 0; i < work.size() - 2; i += 3) {
400        assertNotEquals(work.get(i + 0), work.get(i + 1));
401        assertNotEquals(work.get(i + 0), work.get(i + 2));
402        assertNotEquals(work.get(i + 1), work.get(i + 2));
403      }
404    } finally {
405      scheduler.stop();
406    }
407  }
408
409  private void doAnswerTaskExecution(final CallRunner callTask,
410      final ArrayList<Integer> results, final int value, final int sleepInterval) {
411    callTask.setStatus(new MonitoredRPCHandlerImpl());
412    doAnswer(new Answer<Object>() {
413      @Override
414      public Object answer(InvocationOnMock invocation) {
415        synchronized (results) {
416          results.add(value);
417        }
418        Threads.sleepWithoutInterrupt(sleepInterval);
419        return null;
420      }
421    }).when(callTask).run();
422  }
423
424  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
425      throws InterruptedException {
426    while (scheduler.getGeneralQueueLength() > 0) {
427      Thread.sleep(100);
428    }
429  }
430
431  @Test
432  public void testSoftAndHardQueueLimits() throws Exception {
433
434    Configuration schedConf = HBaseConfiguration.create();
435
436    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
437    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
438    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
439      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
440
441    PriorityFunction priority = mock(PriorityFunction.class);
442    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
443    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
444      HConstants.QOS_THRESHOLD);
445    try {
446      scheduler.start();
447
448      CallRunner putCallTask = mock(CallRunner.class);
449      ServerCall putCall = mock(ServerCall.class);
450      putCall.param = RequestConverter.buildMutateRequest(
451        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
452      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
453      when(putCallTask.getRpcCall()).thenReturn(putCall);
454      when(putCall.getHeader()).thenReturn(putHead);
455
456      assertTrue(scheduler.dispatch(putCallTask));
457
458      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
459      scheduler.onConfigurationChange(schedConf);
460      assertFalse(scheduler.dispatch(putCallTask));
461      waitUntilQueueEmpty(scheduler);
462      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
463      scheduler.onConfigurationChange(schedConf);
464      assertTrue(scheduler.dispatch(putCallTask));
465    } finally {
466      scheduler.stop();
467    }
468  }
469
470  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
471
472    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
473
474    private long offset;
475
476    private final Set<String> threadNamePrefixs = new HashSet<>();
477
478    @Override
479    public long currentTime() {
480      for (String threadNamePrefix : threadNamePrefixs) {
481        String threadName = Thread.currentThread().getName();
482        if (threadName.startsWith(threadNamePrefix)) {
483          return timeQ.poll().longValue() + offset;
484        }
485      }
486      return System.currentTimeMillis();
487    }
488  }
489
490  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
491  // from the get go. So we are always overloaded. The test below would seem to complete the
492  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
493  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
494  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
495  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
496  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
497  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
498  // timer-based. Disabling for now till chat with authors.
499  @Test
500  public void testCoDelScheduling() throws Exception {
501    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
502    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
503    Configuration schedConf = HBaseConfiguration.create();
504    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
505    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
506      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
507    PriorityFunction priority = mock(PriorityFunction.class);
508    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
509    SimpleRpcScheduler scheduler =
510        new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
511    try {
512      // Loading mocked call runner can take a good amount of time the first time through
513      // (haven't looked why). Load it for first time here outside of the timed loop.
514      getMockedCallRunner(System.currentTimeMillis(), 2);
515      scheduler.start();
516      EnvironmentEdgeManager.injectEdge(envEdge);
517      envEdge.offset = 5;
518      // Calls faster than min delay
519      // LOG.info("Start");
520      for (int i = 0; i < 100; i++) {
521        long time = System.currentTimeMillis();
522        envEdge.timeQ.put(time);
523        CallRunner cr = getMockedCallRunner(time, 2);
524        // LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr);
525        scheduler.dispatch(cr);
526      }
527      // LOG.info("Loop done");
528      // make sure fast calls are handled
529      waitUntilQueueEmpty(scheduler);
530      Thread.sleep(100);
531      assertEquals("None of these calls should have been discarded", 0,
532        scheduler.getNumGeneralCallsDropped());
533
534      envEdge.offset = 151;
535      // calls slower than min delay, but not individually slow enough to be dropped
536      for (int i = 0; i < 20; i++) {
537        long time = System.currentTimeMillis();
538        envEdge.timeQ.put(time);
539        CallRunner cr = getMockedCallRunner(time, 2);
540        scheduler.dispatch(cr);
541      }
542
543      // make sure somewhat slow calls are handled
544      waitUntilQueueEmpty(scheduler);
545      Thread.sleep(100);
546      assertEquals("None of these calls should have been discarded", 0,
547        scheduler.getNumGeneralCallsDropped());
548
549      envEdge.offset = 2000;
550      // now slow calls and the ones to be dropped
551      for (int i = 0; i < 60; i++) {
552        long time = System.currentTimeMillis();
553        envEdge.timeQ.put(time);
554        CallRunner cr = getMockedCallRunner(time, 100);
555        scheduler.dispatch(cr);
556      }
557
558      // make sure somewhat slow calls are handled
559      waitUntilQueueEmpty(scheduler);
560      Thread.sleep(100);
561      assertTrue(
562          "There should have been at least 12 calls dropped however there were "
563              + scheduler.getNumGeneralCallsDropped(),
564          scheduler.getNumGeneralCallsDropped() > 12);
565    } finally {
566      scheduler.stop();
567    }
568  }
569
570  @Test
571  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
572    String name = testName.getMethodName();
573    int handlerCount = 1;
574    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
575    int maxQueueLength = 0;
576    PriorityFunction priority = mock(PriorityFunction.class);
577    Configuration conf = HBaseConfiguration.create();
578    Abortable abortable = mock(Abortable.class);
579    FastPathBalancedQueueRpcExecutor executor =
580      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name,
581      handlerCount, callQueueType, maxQueueLength, priority, conf, abortable));
582    CallRunner task = mock(CallRunner.class);
583    assertFalse(executor.dispatch(task));
584    //make sure we never internally get a handler, which would skip the queue validation
585    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
586      Mockito.any(), Mockito.any());
587  }
588
589  @Test
590  public void testMetaRWScanQueues() throws Exception {
591    Configuration schedConf = HBaseConfiguration.create();
592    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
593    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
594    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
595
596    PriorityFunction priority = mock(PriorityFunction.class);
597    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
598
599    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority,
600        HConstants.QOS_THRESHOLD);
601    try {
602      scheduler.start();
603
604      CallRunner putCallTask = mock(CallRunner.class);
605      ServerCall putCall = mock(ServerCall.class);
606      putCall.param = RequestConverter.buildMutateRequest(
607          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
608      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
609      when(putCallTask.getRpcCall()).thenReturn(putCall);
610      when(putCall.getHeader()).thenReturn(putHead);
611      when(putCall.getParam()).thenReturn(putCall.param);
612
613      CallRunner getCallTask = mock(CallRunner.class);
614      ServerCall getCall = mock(ServerCall.class);
615      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
616      when(getCallTask.getRpcCall()).thenReturn(getCall);
617      when(getCall.getHeader()).thenReturn(getHead);
618
619      CallRunner scanCallTask = mock(CallRunner.class);
620      ServerCall scanCall = mock(ServerCall.class);
621      scanCall.param = ScanRequest.newBuilder().build();
622      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
623      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
624      when(scanCall.getHeader()).thenReturn(scanHead);
625      when(scanCall.getParam()).thenReturn(scanCall.param);
626
627      ArrayList<Integer> work = new ArrayList<>();
628      doAnswerTaskExecution(putCallTask, work, 1, 1000);
629      doAnswerTaskExecution(getCallTask, work, 2, 1000);
630      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
631
632      // There are 3 queues: [puts], [gets], [scans]
633      // so the calls will be interleaved
634      scheduler.dispatch(putCallTask);
635      scheduler.dispatch(putCallTask);
636      scheduler.dispatch(putCallTask);
637      scheduler.dispatch(getCallTask);
638      scheduler.dispatch(getCallTask);
639      scheduler.dispatch(getCallTask);
640      scheduler.dispatch(scanCallTask);
641      scheduler.dispatch(scanCallTask);
642      scheduler.dispatch(scanCallTask);
643
644      while (work.size() < 6) {
645        Thread.sleep(100);
646      }
647
648      for (int i = 0; i < work.size() - 2; i += 3) {
649        assertNotEquals(work.get(i + 0), work.get(i + 1));
650        assertNotEquals(work.get(i + 0), work.get(i + 2));
651        assertNotEquals(work.get(i + 1), work.get(i + 2));
652      }
653    } finally {
654      scheduler.stop();
655    }
656  }
657
658  // Get mocked call that has the CallRunner sleep for a while so that the fast
659  // path isn't hit.
660  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
661    ServerCall putCall = new ServerCall(1, null, null,
662        RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
663        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
664        null, null, 9, null, timestamp, 0, null, null, null) {
665
666      @Override
667      public void sendResponseIfReady() throws IOException {
668      }
669    };
670
671    CallRunner cr = new CallRunner(null, putCall) {
672      @Override
673      public void run() {
674        if (sleepTime <= 0) {
675          return;
676        }
677        try {
678          LOG.warn("Sleeping for " + sleepTime);
679          Thread.sleep(sleepTime);
680          LOG.warn("Done Sleeping for " + sleepTime);
681        } catch (InterruptedException e) {
682        }
683      }
684
685      @Override
686      public RpcCall getRpcCall() {
687        return putCall;
688      }
689
690      @Override
691      public void drop() {
692      }
693    };
694
695    return cr;
696  }
697}