001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.timeout;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.net.InetSocketAddress;
035import java.util.ArrayList;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.BlockingQueue;
041import java.util.concurrent.CountDownLatch;
042import java.util.concurrent.LinkedBlockingQueue;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.Abortable;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.RPCTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdge;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.mockito.Mockito;
063import org.mockito.invocation.InvocationOnMock;
064import org.mockito.stubbing.Answer;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
070import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
075
076@Category({RPCTests.class, MediumTests.class})
077public class TestSimpleRpcScheduler {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
082
083  @Rule
084  public TestName testName = new TestName();
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
087
088  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
089    @Override
090    public InetSocketAddress getListenerAddress() {
091      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
092    }
093  };
094  private Configuration conf;
095
096  @Before
097  public void setUp() {
098    conf = HBaseConfiguration.create();
099  }
100
101  @Test
102  public void testBasic() throws IOException, InterruptedException {
103
104    PriorityFunction qosFunction = mock(PriorityFunction.class);
105    RpcScheduler scheduler = new SimpleRpcScheduler(
106        conf, 10, 0, 0, qosFunction, 0);
107    scheduler.init(CONTEXT);
108    scheduler.start();
109    CallRunner task = createMockTask();
110    task.setStatus(new MonitoredRPCHandlerImpl());
111    scheduler.dispatch(task);
112    verify(task, timeout(10000)).run();
113    scheduler.stop();
114  }
115
116  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
117    try {
118      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
119      ExecutorField.setAccessible(true);
120
121      RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
122
123      Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().
124        getDeclaredField("handlerCount");
125
126      handlerCountField.setAccessible(true);
127      handlerCountField.set(rpcExecutor, 0);
128
129      Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().
130        getDeclaredField("numCallQueues");
131
132      numCallQueuesField.setAccessible(true);
133      numCallQueuesField.set(rpcExecutor, 1);
134
135      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().
136        getDeclaredField("currentQueueLimit");
137
138      currentQueueLimitField.setAccessible(true);
139      currentQueueLimitField.set(rpcExecutor, 100);
140
141    } catch (NoSuchFieldException e) {
142      LOG.error("No such field exception"+e);
143    } catch (IllegalAccessException e) {
144      LOG.error("Illegal access exception"+e);
145    }
146
147    return scheduler;
148  }
149
150  @Test
151  public void testCallQueueInfo() throws IOException, InterruptedException {
152
153    PriorityFunction qosFunction = mock(PriorityFunction.class);
154    RpcScheduler scheduler = new SimpleRpcScheduler(
155            conf, 0, 0, 0, qosFunction, 0);
156
157    scheduler.init(CONTEXT);
158
159    // Set the handlers to zero. So that number of requests in call Queue can be tested
160    scheduler = disableHandlers(scheduler);
161    scheduler.start();
162
163    int totalCallMethods = 10;
164    for (int i = totalCallMethods; i>0; i--) {
165      CallRunner task = createMockTask();
166      task.setStatus(new MonitoredRPCHandlerImpl());
167      scheduler.dispatch(task);
168    }
169
170
171    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
172
173    for (String callQueueName:callQueueInfo.getCallQueueNames()) {
174
175      for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
176        assertEquals(totalCallMethods,
177            callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
178      }
179
180    }
181
182    scheduler.stop();
183
184  }
185
186
187  @Test
188  public void testHandlerIsolation() throws IOException, InterruptedException {
189    CallRunner generalTask = createMockTask();
190    CallRunner priorityTask = createMockTask();
191    CallRunner replicationTask = createMockTask();
192    List<CallRunner> tasks = ImmutableList.of(
193        generalTask,
194        priorityTask,
195        replicationTask);
196    Map<CallRunner, Integer> qos = ImmutableMap.of(
197        generalTask, 0,
198        priorityTask, HConstants.HIGH_QOS + 1,
199        replicationTask, HConstants.REPLICATION_QOS);
200    PriorityFunction qosFunction = mock(PriorityFunction.class);
201    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
202    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
203    Answer<Void> answerToRun = new Answer<Void>() {
204      @Override
205      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
206        synchronized (handlerThreads) {
207          handlerThreads.put(
208              (CallRunner) invocationOnMock.getMock(),
209              Thread.currentThread());
210        }
211        countDownLatch.countDown();
212        return null;
213      }
214    };
215    for (CallRunner task : tasks) {
216      task.setStatus(new MonitoredRPCHandlerImpl());
217      doAnswer(answerToRun).when(task).run();
218    }
219
220    RpcScheduler scheduler = new SimpleRpcScheduler(
221        conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
222    scheduler.init(CONTEXT);
223    scheduler.start();
224    for (CallRunner task : tasks) {
225      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
226      scheduler.dispatch(task);
227    }
228    for (CallRunner task : tasks) {
229      verify(task, timeout(10000)).run();
230    }
231    scheduler.stop();
232
233    // Tests that these requests are handled by three distinct threads.
234    countDownLatch.await();
235    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
236  }
237
238  private CallRunner createMockTask() {
239    ServerCall call = mock(ServerCall.class);
240    CallRunner task = mock(CallRunner.class);
241    when(task.getRpcCall()).thenReturn(call);
242    return task;
243  }
244
245  @Test
246  public void testRpcScheduler() throws Exception {
247    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
248    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
249  }
250
251  @Test
252  public void testPluggableRpcQueue() throws Exception {
253    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
254      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
255
256    try {
257      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
258        "MissingClass");
259      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
260    } catch (PluggableRpcQueueNotFound e) {
261      // expected
262    } catch (Exception e) {
263      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
264    }
265
266    try {
267      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
268        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
269      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
270    } catch (PluggableRpcQueueNotFound e) {
271      // expected
272    } catch (Exception e) {
273      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
274    }
275  }
276
277  @Test
278  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
279
280    Configuration schedConf = HBaseConfiguration.create();
281
282    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
283    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
284    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
285      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
286    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
287      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
288
289    PriorityFunction priority = mock(PriorityFunction.class);
290    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
291    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
292      HConstants.QOS_THRESHOLD);
293    try {
294      scheduler.start();
295
296      CallRunner putCallTask = mock(CallRunner.class);
297      ServerCall putCall = mock(ServerCall.class);
298      putCall.param = RequestConverter.buildMutateRequest(
299        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
300      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
301      when(putCallTask.getRpcCall()).thenReturn(putCall);
302      when(putCall.getHeader()).thenReturn(putHead);
303
304      assertTrue(scheduler.dispatch(putCallTask));
305
306      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
307      scheduler.onConfigurationChange(schedConf);
308      assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
309      waitUntilQueueEmpty(scheduler);
310    } finally {
311      scheduler.stop();
312    }
313  }
314
315  private void testRpcScheduler(final String queueType) throws Exception {
316    testRpcScheduler(queueType, null);
317  }
318
319  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
320    throws Exception {
321
322    Configuration schedConf = HBaseConfiguration.create();
323    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
324
325    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
326      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
327    }
328
329    PriorityFunction priority = mock(PriorityFunction.class);
330    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
331
332    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
333                                                    HConstants.QOS_THRESHOLD);
334    try {
335      scheduler.start();
336
337      CallRunner smallCallTask = mock(CallRunner.class);
338      ServerCall smallCall = mock(ServerCall.class);
339      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
340      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
341      when(smallCall.getHeader()).thenReturn(smallHead);
342
343      CallRunner largeCallTask = mock(CallRunner.class);
344      ServerCall largeCall = mock(ServerCall.class);
345      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
346      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
347      when(largeCall.getHeader()).thenReturn(largeHead);
348
349      CallRunner hugeCallTask = mock(CallRunner.class);
350      ServerCall hugeCall = mock(ServerCall.class);
351      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
352      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
353      when(hugeCall.getHeader()).thenReturn(hugeHead);
354
355      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
356      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
357      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
358
359      final ArrayList<Integer> work = new ArrayList<>();
360      doAnswerTaskExecution(smallCallTask, work, 10, 250);
361      doAnswerTaskExecution(largeCallTask, work, 50, 250);
362      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
363
364      scheduler.dispatch(smallCallTask);
365      scheduler.dispatch(smallCallTask);
366      scheduler.dispatch(smallCallTask);
367      scheduler.dispatch(hugeCallTask);
368      scheduler.dispatch(smallCallTask);
369      scheduler.dispatch(largeCallTask);
370      scheduler.dispatch(smallCallTask);
371      scheduler.dispatch(smallCallTask);
372
373      while (work.size() < 8) {
374        Thread.sleep(100);
375      }
376
377      int seqSum = 0;
378      int totalTime = 0;
379      for (int i = 0; i < work.size(); ++i) {
380        LOG.debug("Request i=" + i + " value=" + work.get(i));
381        seqSum += work.get(i);
382        totalTime += seqSum;
383      }
384      LOG.debug("Total Time: " + totalTime);
385
386      // -> [small small small huge small large small small]
387      // -> NO REORDER   [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
388      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
389      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
390        assertEquals(530, totalTime);
391      } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
392        assertEquals(930, totalTime);
393      }
394    } finally {
395      scheduler.stop();
396    }
397  }
398
399  @Test
400  public void testScanQueueWithZeroScanRatio() throws Exception {
401
402    Configuration schedConf = HBaseConfiguration.create();
403    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
404    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
405    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
406
407    PriorityFunction priority = mock(PriorityFunction.class);
408    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
409
410    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
411                                                    HConstants.QOS_THRESHOLD);
412    assertNotEquals(null, scheduler);
413  }
414
415  @Test
416  public void testScanQueues() throws Exception {
417    Configuration schedConf = HBaseConfiguration.create();
418    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
419    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
420    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
421
422    PriorityFunction priority = mock(PriorityFunction.class);
423    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
424
425    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
426                                                    HConstants.QOS_THRESHOLD);
427    try {
428      scheduler.start();
429
430      CallRunner putCallTask = mock(CallRunner.class);
431      ServerCall putCall = mock(ServerCall.class);
432      putCall.param = RequestConverter.buildMutateRequest(
433          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
434      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
435      when(putCallTask.getRpcCall()).thenReturn(putCall);
436      when(putCall.getHeader()).thenReturn(putHead);
437      when(putCall.getParam()).thenReturn(putCall.param);
438
439      CallRunner getCallTask = mock(CallRunner.class);
440      ServerCall getCall = mock(ServerCall.class);
441      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
442      when(getCallTask.getRpcCall()).thenReturn(getCall);
443      when(getCall.getHeader()).thenReturn(getHead);
444
445      CallRunner scanCallTask = mock(CallRunner.class);
446      ServerCall scanCall = mock(ServerCall.class);
447      scanCall.param = ScanRequest.newBuilder().build();
448      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
449      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
450      when(scanCall.getHeader()).thenReturn(scanHead);
451      when(scanCall.getParam()).thenReturn(scanCall.param);
452
453      ArrayList<Integer> work = new ArrayList<>();
454      doAnswerTaskExecution(putCallTask, work, 1, 1000);
455      doAnswerTaskExecution(getCallTask, work, 2, 1000);
456      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
457
458      // There are 3 queues: [puts], [gets], [scans]
459      // so the calls will be interleaved
460      scheduler.dispatch(putCallTask);
461      scheduler.dispatch(putCallTask);
462      scheduler.dispatch(putCallTask);
463      scheduler.dispatch(getCallTask);
464      scheduler.dispatch(getCallTask);
465      scheduler.dispatch(getCallTask);
466      scheduler.dispatch(scanCallTask);
467      scheduler.dispatch(scanCallTask);
468      scheduler.dispatch(scanCallTask);
469
470      while (work.size() < 6) {
471        Thread.sleep(100);
472      }
473
474      for (int i = 0; i < work.size() - 2; i += 3) {
475        assertNotEquals(work.get(i + 0), work.get(i + 1));
476        assertNotEquals(work.get(i + 0), work.get(i + 2));
477        assertNotEquals(work.get(i + 1), work.get(i + 2));
478      }
479    } finally {
480      scheduler.stop();
481    }
482  }
483
484  private void doAnswerTaskExecution(final CallRunner callTask,
485      final ArrayList<Integer> results, final int value, final int sleepInterval) {
486    callTask.setStatus(new MonitoredRPCHandlerImpl());
487    doAnswer(new Answer<Object>() {
488      @Override
489      public Object answer(InvocationOnMock invocation) {
490        synchronized (results) {
491          results.add(value);
492        }
493        Threads.sleepWithoutInterrupt(sleepInterval);
494        return null;
495      }
496    }).when(callTask).run();
497  }
498
499  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
500      throws InterruptedException {
501    while (scheduler.getGeneralQueueLength() > 0) {
502      Thread.sleep(100);
503    }
504  }
505
506  @Test
507  public void testSoftAndHardQueueLimits() throws Exception {
508
509    Configuration schedConf = HBaseConfiguration.create();
510
511    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
512    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
513    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
514      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
515
516    PriorityFunction priority = mock(PriorityFunction.class);
517    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
518    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
519      HConstants.QOS_THRESHOLD);
520    try {
521      scheduler.start();
522
523      CallRunner putCallTask = mock(CallRunner.class);
524      ServerCall putCall = mock(ServerCall.class);
525      putCall.param = RequestConverter.buildMutateRequest(
526        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
527      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
528      when(putCallTask.getRpcCall()).thenReturn(putCall);
529      when(putCall.getHeader()).thenReturn(putHead);
530
531      assertTrue(scheduler.dispatch(putCallTask));
532
533      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
534      scheduler.onConfigurationChange(schedConf);
535      assertFalse(scheduler.dispatch(putCallTask));
536      waitUntilQueueEmpty(scheduler);
537      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
538      scheduler.onConfigurationChange(schedConf);
539      assertTrue(scheduler.dispatch(putCallTask));
540    } finally {
541      scheduler.stop();
542    }
543  }
544
545  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
546
547    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
548
549    private long offset;
550
551    private final Set<String> threadNamePrefixs = new HashSet<>();
552
553    @Override
554    public long currentTime() {
555      for (String threadNamePrefix : threadNamePrefixs) {
556        String threadName = Thread.currentThread().getName();
557        if (threadName.startsWith(threadNamePrefix)) {
558          if (timeQ != null) {
559            Long qTime = timeQ.poll();
560            if (qTime != null) {
561              return qTime.longValue() + offset;
562            }
563          }
564        }
565      }
566      return System.currentTimeMillis();
567    }
568  }
569
570  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
571  // from the get go. So we are always overloaded. The test below would seem to complete the
572  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
573  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
574  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
575  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
576  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
577  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
578  // timer-based. Disabling for now till chat with authors.
579  @Test
580  public void testCoDelScheduling() throws Exception {
581    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
582    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
583    Configuration schedConf = HBaseConfiguration.create();
584    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
585    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
586      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
587    PriorityFunction priority = mock(PriorityFunction.class);
588    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
589    SimpleRpcScheduler scheduler =
590        new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
591    try {
592      // Loading mocked call runner can take a good amount of time the first time through
593      // (haven't looked why). Load it for first time here outside of the timed loop.
594      getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2);
595      scheduler.start();
596      EnvironmentEdgeManager.injectEdge(envEdge);
597      envEdge.offset = 5;
598      // Calls faster than min delay
599      // LOG.info("Start");
600      for (int i = 0; i < 100; i++) {
601        long time = EnvironmentEdgeManager.currentTime();
602        envEdge.timeQ.put(time);
603        CallRunner cr = getMockedCallRunner(time, 2);
604        scheduler.dispatch(cr);
605      }
606      // LOG.info("Loop done");
607      // make sure fast calls are handled
608      waitUntilQueueEmpty(scheduler);
609      Thread.sleep(100);
610      assertEquals("None of these calls should have been discarded", 0,
611        scheduler.getNumGeneralCallsDropped());
612
613      envEdge.offset = 151;
614      // calls slower than min delay, but not individually slow enough to be dropped
615      for (int i = 0; i < 20; i++) {
616        long time = EnvironmentEdgeManager.currentTime();
617        envEdge.timeQ.put(time);
618        CallRunner cr = getMockedCallRunner(time, 2);
619        scheduler.dispatch(cr);
620      }
621
622      // make sure somewhat slow calls are handled
623      waitUntilQueueEmpty(scheduler);
624      Thread.sleep(100);
625      assertEquals("None of these calls should have been discarded", 0,
626        scheduler.getNumGeneralCallsDropped());
627
628      envEdge.offset = 2000;
629      // now slow calls and the ones to be dropped
630      for (int i = 0; i < 60; i++) {
631        long time = EnvironmentEdgeManager.currentTime();
632        envEdge.timeQ.put(time);
633        CallRunner cr = getMockedCallRunner(time, 100);
634        scheduler.dispatch(cr);
635      }
636
637      // make sure somewhat slow calls are handled
638      waitUntilQueueEmpty(scheduler);
639      Thread.sleep(100);
640      assertTrue(
641          "There should have been at least 12 calls dropped however there were "
642              + scheduler.getNumGeneralCallsDropped(),
643          scheduler.getNumGeneralCallsDropped() > 12);
644    } finally {
645      scheduler.stop();
646    }
647  }
648
649  @Test
650  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
651    String name = testName.getMethodName();
652    int handlerCount = 1;
653    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
654    int maxQueueLength = 0;
655    PriorityFunction priority = mock(PriorityFunction.class);
656    Configuration conf = HBaseConfiguration.create();
657    Abortable abortable = mock(Abortable.class);
658    FastPathBalancedQueueRpcExecutor executor =
659      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name,
660      handlerCount, callQueueType, maxQueueLength, priority, conf, abortable));
661    CallRunner task = mock(CallRunner.class);
662    assertFalse(executor.dispatch(task));
663    //make sure we never internally get a handler, which would skip the queue validation
664    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
665      Mockito.any(), Mockito.any());
666  }
667
668  @Test
669  public void testMetaRWScanQueues() throws Exception {
670    Configuration schedConf = HBaseConfiguration.create();
671    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
672    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
673    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
674
675    PriorityFunction priority = mock(PriorityFunction.class);
676    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
677
678    RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority,
679        HConstants.QOS_THRESHOLD);
680    try {
681      scheduler.start();
682
683      CallRunner putCallTask = mock(CallRunner.class);
684      ServerCall putCall = mock(ServerCall.class);
685      putCall.param = RequestConverter.buildMutateRequest(
686          Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
687      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
688      when(putCallTask.getRpcCall()).thenReturn(putCall);
689      when(putCall.getHeader()).thenReturn(putHead);
690      when(putCall.getParam()).thenReturn(putCall.param);
691
692      CallRunner getCallTask = mock(CallRunner.class);
693      ServerCall getCall = mock(ServerCall.class);
694      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
695      when(getCallTask.getRpcCall()).thenReturn(getCall);
696      when(getCall.getHeader()).thenReturn(getHead);
697
698      CallRunner scanCallTask = mock(CallRunner.class);
699      ServerCall scanCall = mock(ServerCall.class);
700      scanCall.param = ScanRequest.newBuilder().build();
701      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
702      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
703      when(scanCall.getHeader()).thenReturn(scanHead);
704      when(scanCall.getParam()).thenReturn(scanCall.param);
705
706      ArrayList<Integer> work = new ArrayList<>();
707      doAnswerTaskExecution(putCallTask, work, 1, 1000);
708      doAnswerTaskExecution(getCallTask, work, 2, 1000);
709      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
710
711      // There are 3 queues: [puts], [gets], [scans]
712      // so the calls will be interleaved
713      scheduler.dispatch(putCallTask);
714      scheduler.dispatch(putCallTask);
715      scheduler.dispatch(putCallTask);
716      scheduler.dispatch(getCallTask);
717      scheduler.dispatch(getCallTask);
718      scheduler.dispatch(getCallTask);
719      scheduler.dispatch(scanCallTask);
720      scheduler.dispatch(scanCallTask);
721      scheduler.dispatch(scanCallTask);
722
723      while (work.size() < 6) {
724        Thread.sleep(100);
725      }
726
727      for (int i = 0; i < work.size() - 2; i += 3) {
728        assertNotEquals(work.get(i + 0), work.get(i + 1));
729        assertNotEquals(work.get(i + 0), work.get(i + 2));
730        assertNotEquals(work.get(i + 1), work.get(i + 2));
731      }
732    } finally {
733      scheduler.stop();
734    }
735  }
736
737  // Get mocked call that has the CallRunner sleep for a while so that the fast
738  // path isn't hit.
739  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
740    ServerCall putCall = new ServerCall(1, null, null,
741        RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
742        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
743        null, null, 9, null, timestamp, 0, null, null, null) {
744
745      @Override
746      public void sendResponseIfReady() throws IOException {
747      }
748    };
749
750    CallRunner cr = new CallRunner(null, putCall) {
751      @Override
752      public void run() {
753        if (sleepTime <= 0) {
754          return;
755        }
756        try {
757          LOG.warn("Sleeping for " + sleepTime);
758          Thread.sleep(sleepTime);
759          LOG.warn("Done Sleeping for " + sleepTime);
760        } catch (InterruptedException e) {
761        }
762      }
763
764      @Override
765      public RpcCall getRpcCall() {
766        return putCall;
767      }
768
769      @Override
770      public void drop() {
771      }
772    };
773
774    return cr;
775  }
776}