001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.timeout;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.LinkedBlockingQueue;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.Abortable;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RPCTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdge;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.Threads;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.mockito.Mockito;
064import org.mockito.invocation.InvocationOnMock;
065import org.mockito.stubbing.Answer;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
072import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
073
074import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
078
079@Category({ RPCTests.class, MediumTests.class })
080public class TestSimpleRpcScheduler {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
085
086  @Rule
087  public TestName testName = new TestName();
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
090
091  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
092    @Override
093    public InetSocketAddress getListenerAddress() {
094      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
095    }
096  };
097  private Configuration conf;
098
099  @Before
100  public void setUp() {
101    conf = HBaseConfiguration.create();
102  }
103
104  @Test
105  public void testBasic() throws IOException, InterruptedException {
106
107    PriorityFunction qosFunction = mock(PriorityFunction.class);
108    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
109    scheduler.init(CONTEXT);
110    scheduler.start();
111    CallRunner task = createMockTask();
112    task.setStatus(new MonitoredRPCHandlerImpl());
113    scheduler.dispatch(task);
114    verify(task, timeout(10000)).run();
115    scheduler.stop();
116  }
117
118  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
119    try {
120      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
121      ExecutorField.setAccessible(true);
122
123      RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler);
124
125      Field handlerCountField =
126        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
127
128      handlerCountField.setAccessible(true);
129      handlerCountField.set(rpcExecutor, 0);
130
131      Field numCallQueuesField =
132        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
133
134      numCallQueuesField.setAccessible(true);
135      numCallQueuesField.set(rpcExecutor, 1);
136
137      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass()
138        .getDeclaredField("currentQueueLimit");
139
140      currentQueueLimitField.setAccessible(true);
141      currentQueueLimitField.set(rpcExecutor, 100);
142
143    } catch (NoSuchFieldException e) {
144      LOG.error("No such field exception" + e);
145    } catch (IllegalAccessException e) {
146      LOG.error("Illegal access exception" + e);
147    }
148
149    return scheduler;
150  }
151
152  @Test
153  public void testCallQueueInfo() throws IOException, InterruptedException {
154
155    PriorityFunction qosFunction = mock(PriorityFunction.class);
156    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0);
157
158    scheduler.init(CONTEXT);
159
160    // Set the handlers to zero. So that number of requests in call Queue can be tested
161    scheduler = disableHandlers(scheduler);
162    scheduler.start();
163
164    int totalCallMethods = 10;
165    for (int i = totalCallMethods; i > 0; i--) {
166      CallRunner task = createMockTask();
167      task.setStatus(new MonitoredRPCHandlerImpl());
168      scheduler.dispatch(task);
169    }
170
171    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
172
173    for (String callQueueName : callQueueInfo.getCallQueueNames()) {
174
175      for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) {
176        assertEquals(totalCallMethods,
177          callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
178      }
179
180    }
181
182    scheduler.stop();
183
184  }
185
186  @Test
187  public void testHandlerIsolation() throws IOException, InterruptedException {
188    CallRunner generalTask = createMockTask();
189    CallRunner priorityTask = createMockTask();
190    CallRunner replicationTask = createMockTask();
191    List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
192    Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
193      HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
194    PriorityFunction qosFunction = mock(PriorityFunction.class);
195    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
196    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
197    Answer<Void> answerToRun = new Answer<Void>() {
198      @Override
199      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
200        synchronized (handlerThreads) {
201          handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread());
202        }
203        countDownLatch.countDown();
204        return null;
205      }
206    };
207    for (CallRunner task : tasks) {
208      task.setStatus(new MonitoredRPCHandlerImpl());
209      doAnswer(answerToRun).when(task).run();
210    }
211
212    RpcScheduler scheduler =
213      new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS);
214    scheduler.init(CONTEXT);
215    scheduler.start();
216    for (CallRunner task : tasks) {
217      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
218      scheduler.dispatch(task);
219    }
220    for (CallRunner task : tasks) {
221      verify(task, timeout(10000)).run();
222    }
223    scheduler.stop();
224
225    // Tests that these requests are handled by three distinct threads.
226    countDownLatch.await();
227    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
228  }
229
230  private CallRunner createMockTask() {
231    ServerCall call = mock(ServerCall.class);
232    CallRunner task = mock(CallRunner.class);
233    when(task.getRpcCall()).thenReturn(call);
234    return task;
235  }
236
237  @Test
238  public void testRpcScheduler() throws Exception {
239    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
240    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
241  }
242
243  @Test
244  public void testPluggableRpcQueue() throws Exception {
245    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
246      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
247
248    try {
249      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass");
250      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
251    } catch (PluggableRpcQueueNotFound e) {
252      // expected
253    } catch (Exception e) {
254      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
255    }
256
257    try {
258      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
259        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
260      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
261    } catch (PluggableRpcQueueNotFound e) {
262      // expected
263    } catch (Exception e) {
264      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
265    }
266  }
267
268  @Test
269  public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
270    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
271    Configuration schedConf = HBaseConfiguration.create();
272    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
273    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
274      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
275    schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
276
277    PriorityFunction priority = mock(PriorityFunction.class);
278    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
279    SimpleRpcScheduler scheduler =
280      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
281
282    Field f = scheduler.getClass().getDeclaredField("callExecutor");
283    f.setAccessible(true);
284    assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
285  }
286
287  @Test
288  public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
289    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
290    Configuration schedConf = HBaseConfiguration.create();
291    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
292    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
293      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
294
295    PriorityFunction priority = mock(PriorityFunction.class);
296    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
297    SimpleRpcScheduler scheduler =
298      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
299
300    Field f = scheduler.getClass().getDeclaredField("callExecutor");
301    f.setAccessible(true);
302    assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
303  }
304
305  @Test
306  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
307
308    Configuration schedConf = HBaseConfiguration.create();
309
310    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
311    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
312    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
313      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
314    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
315      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
316
317    PriorityFunction priority = mock(PriorityFunction.class);
318    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
319    SimpleRpcScheduler scheduler =
320      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
321    try {
322      scheduler.start();
323
324      CallRunner putCallTask = mock(CallRunner.class);
325      ServerCall putCall = mock(ServerCall.class);
326      putCall.param =
327        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
328      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
329      when(putCallTask.getRpcCall()).thenReturn(putCall);
330      when(putCall.getHeader()).thenReturn(putHead);
331
332      assertTrue(scheduler.dispatch(putCallTask));
333
334      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
335      scheduler.onConfigurationChange(schedConf);
336      assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
337      waitUntilQueueEmpty(scheduler);
338    } finally {
339      scheduler.stop();
340    }
341  }
342
343  private void testRpcScheduler(final String queueType) throws Exception {
344    testRpcScheduler(queueType, null);
345  }
346
347  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
348    throws Exception {
349    Configuration schedConf = HBaseConfiguration.create();
350    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
351
352    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
353      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
354    }
355
356    PriorityFunction priority = mock(PriorityFunction.class);
357    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
358
359    RpcScheduler scheduler =
360      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
361    try {
362      scheduler.start();
363
364      CallRunner smallCallTask = mock(CallRunner.class);
365      ServerCall smallCall = mock(ServerCall.class);
366      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
367      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
368      when(smallCall.getHeader()).thenReturn(smallHead);
369
370      CallRunner largeCallTask = mock(CallRunner.class);
371      ServerCall largeCall = mock(ServerCall.class);
372      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
373      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
374      when(largeCall.getHeader()).thenReturn(largeHead);
375
376      CallRunner hugeCallTask = mock(CallRunner.class);
377      ServerCall hugeCall = mock(ServerCall.class);
378      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
379      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
380      when(hugeCall.getHeader()).thenReturn(hugeHead);
381
382      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
383      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
384      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
385
386      final ArrayList<Integer> work = new ArrayList<>();
387      doAnswerTaskExecution(smallCallTask, work, 10, 250);
388      doAnswerTaskExecution(largeCallTask, work, 50, 250);
389      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
390
391      scheduler.dispatch(smallCallTask);
392      scheduler.dispatch(smallCallTask);
393      scheduler.dispatch(smallCallTask);
394      scheduler.dispatch(hugeCallTask);
395      scheduler.dispatch(smallCallTask);
396      scheduler.dispatch(largeCallTask);
397      scheduler.dispatch(smallCallTask);
398      scheduler.dispatch(smallCallTask);
399
400      while (work.size() < 8) {
401        Thread.sleep(100);
402      }
403
404      int seqSum = 0;
405      int totalTime = 0;
406      for (int i = 0; i < work.size(); ++i) {
407        LOG.debug("Request i=" + i + " value=" + work.get(i));
408        seqSum += work.get(i);
409        totalTime += seqSum;
410      }
411      LOG.debug("Total Time: " + totalTime);
412
413      // -> [small small small huge small large small small]
414      // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
415      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
416      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
417        assertEquals(530, totalTime);
418      } else if (
419        queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)
420          || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)
421      ) {
422        assertEquals(930, totalTime);
423      }
424    } finally {
425      scheduler.stop();
426    }
427  }
428
429  @Test
430  public void testScanQueueWithZeroScanRatio() throws Exception {
431
432    Configuration schedConf = HBaseConfiguration.create();
433    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
434    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
435    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
436
437    PriorityFunction priority = mock(PriorityFunction.class);
438    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
439
440    RpcScheduler scheduler =
441      new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD);
442    assertNotEquals(null, scheduler);
443  }
444
445  @Test
446  public void testScanQueues() throws Exception {
447    Configuration schedConf = HBaseConfiguration.create();
448    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
449    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
450    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
451
452    PriorityFunction priority = mock(PriorityFunction.class);
453    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
454
455    RpcScheduler scheduler =
456      new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD);
457    try {
458      scheduler.start();
459
460      CallRunner putCallTask = mock(CallRunner.class);
461      ServerCall putCall = mock(ServerCall.class);
462      putCall.param =
463        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
464      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
465      when(putCallTask.getRpcCall()).thenReturn(putCall);
466      when(putCall.getHeader()).thenReturn(putHead);
467      when(putCall.getParam()).thenReturn(putCall.param);
468
469      CallRunner getCallTask = mock(CallRunner.class);
470      ServerCall getCall = mock(ServerCall.class);
471      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
472      when(getCallTask.getRpcCall()).thenReturn(getCall);
473      when(getCall.getHeader()).thenReturn(getHead);
474
475      CallRunner scanCallTask = mock(CallRunner.class);
476      ServerCall scanCall = mock(ServerCall.class);
477      scanCall.param = ScanRequest.newBuilder().build();
478      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
479      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
480      when(scanCall.getHeader()).thenReturn(scanHead);
481      when(scanCall.getParam()).thenReturn(scanCall.param);
482
483      ArrayList<Integer> work = new ArrayList<>();
484      doAnswerTaskExecution(putCallTask, work, 1, 1000);
485      doAnswerTaskExecution(getCallTask, work, 2, 1000);
486      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
487
488      // There are 3 queues: [puts], [gets], [scans]
489      // so the calls will be interleaved
490      scheduler.dispatch(putCallTask);
491      scheduler.dispatch(putCallTask);
492      scheduler.dispatch(putCallTask);
493      scheduler.dispatch(getCallTask);
494      scheduler.dispatch(getCallTask);
495      scheduler.dispatch(getCallTask);
496      scheduler.dispatch(scanCallTask);
497      scheduler.dispatch(scanCallTask);
498      scheduler.dispatch(scanCallTask);
499
500      while (work.size() < 6) {
501        Thread.sleep(100);
502      }
503
504      for (int i = 0; i < work.size() - 2; i += 3) {
505        assertNotEquals(work.get(i + 0), work.get(i + 1));
506        assertNotEquals(work.get(i + 0), work.get(i + 2));
507        assertNotEquals(work.get(i + 1), work.get(i + 2));
508      }
509    } finally {
510      scheduler.stop();
511    }
512  }
513
514  private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
515    final int value, final int sleepInterval) {
516    callTask.setStatus(new MonitoredRPCHandlerImpl());
517    doAnswer(new Answer<Object>() {
518      @Override
519      public Object answer(InvocationOnMock invocation) {
520        synchronized (results) {
521          results.add(value);
522        }
523        Threads.sleepWithoutInterrupt(sleepInterval);
524        return null;
525      }
526    }).when(callTask).run();
527  }
528
529  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
530    throws InterruptedException {
531    while (scheduler.getGeneralQueueLength() > 0) {
532      Thread.sleep(100);
533    }
534  }
535
536  @Test
537  public void testSoftAndHardQueueLimits() throws Exception {
538
539    Configuration schedConf = HBaseConfiguration.create();
540
541    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
542    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
543    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
544      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
545
546    PriorityFunction priority = mock(PriorityFunction.class);
547    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
548    SimpleRpcScheduler scheduler =
549      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
550    try {
551      scheduler.start();
552
553      CallRunner putCallTask = mock(CallRunner.class);
554      ServerCall putCall = mock(ServerCall.class);
555      putCall.param =
556        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
557      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
558      when(putCallTask.getRpcCall()).thenReturn(putCall);
559      when(putCall.getHeader()).thenReturn(putHead);
560
561      assertTrue(scheduler.dispatch(putCallTask));
562
563      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
564      scheduler.onConfigurationChange(schedConf);
565      assertFalse(scheduler.dispatch(putCallTask));
566      waitUntilQueueEmpty(scheduler);
567      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
568      scheduler.onConfigurationChange(schedConf);
569      assertTrue(scheduler.dispatch(putCallTask));
570    } finally {
571      scheduler.stop();
572    }
573  }
574
575  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
576
577    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
578
579    private long offset;
580
581    private final Set<String> threadNamePrefixs = new HashSet<>();
582
583    @Override
584    public long currentTime() {
585      for (String threadNamePrefix : threadNamePrefixs) {
586        String threadName = Thread.currentThread().getName();
587        if (threadName.startsWith(threadNamePrefix)) {
588          if (timeQ != null) {
589            Long qTime = timeQ.poll();
590            if (qTime != null) {
591              return qTime.longValue() + offset;
592            }
593          }
594        }
595      }
596      return System.currentTimeMillis();
597    }
598  }
599
600  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
601  // from the get go. So we are always overloaded. The test below would seem to complete the
602  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
603  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
604  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
605  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
606  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
607  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
608  // timer-based. Disabling for now till chat with authors.
609  @Test
610  public void testCoDelScheduling() throws Exception {
611    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
612    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
613    Configuration schedConf = HBaseConfiguration.create();
614    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
615    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
616      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
617    PriorityFunction priority = mock(PriorityFunction.class);
618    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
619    SimpleRpcScheduler scheduler =
620      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
621    try {
622      // Loading mocked call runner can take a good amount of time the first time through
623      // (haven't looked why). Load it for first time here outside of the timed loop.
624      getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2);
625      scheduler.start();
626      EnvironmentEdgeManager.injectEdge(envEdge);
627      envEdge.offset = 5;
628      // Calls faster than min delay
629      // LOG.info("Start");
630      for (int i = 0; i < 100; i++) {
631        long time = EnvironmentEdgeManager.currentTime();
632        envEdge.timeQ.put(time);
633        CallRunner cr = getMockedCallRunner(time, 2);
634        scheduler.dispatch(cr);
635      }
636      // LOG.info("Loop done");
637      // make sure fast calls are handled
638      waitUntilQueueEmpty(scheduler);
639      Thread.sleep(100);
640      assertEquals("None of these calls should have been discarded", 0,
641        scheduler.getNumGeneralCallsDropped());
642
643      envEdge.offset = 151;
644      // calls slower than min delay, but not individually slow enough to be dropped
645      for (int i = 0; i < 20; i++) {
646        long time = EnvironmentEdgeManager.currentTime();
647        envEdge.timeQ.put(time);
648        CallRunner cr = getMockedCallRunner(time, 2);
649        scheduler.dispatch(cr);
650      }
651
652      // make sure somewhat slow calls are handled
653      waitUntilQueueEmpty(scheduler);
654      Thread.sleep(100);
655      assertEquals("None of these calls should have been discarded", 0,
656        scheduler.getNumGeneralCallsDropped());
657
658      envEdge.offset = 2000;
659      // now slow calls and the ones to be dropped
660      for (int i = 0; i < 60; i++) {
661        long time = EnvironmentEdgeManager.currentTime();
662        envEdge.timeQ.put(time);
663        CallRunner cr = getMockedCallRunner(time, 100);
664        scheduler.dispatch(cr);
665      }
666
667      // make sure somewhat slow calls are handled
668      waitUntilQueueEmpty(scheduler);
669      Thread.sleep(100);
670      assertTrue("There should have been at least 12 calls dropped however there were "
671        + scheduler.getNumGeneralCallsDropped(), scheduler.getNumGeneralCallsDropped() > 12);
672    } finally {
673      scheduler.stop();
674    }
675  }
676
677  @Test
678  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
679    String name = testName.getMethodName();
680    int handlerCount = 1;
681    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
682    int maxQueueLength = 0;
683    PriorityFunction priority = mock(PriorityFunction.class);
684    Configuration conf = HBaseConfiguration.create();
685    Abortable abortable = mock(Abortable.class);
686    FastPathBalancedQueueRpcExecutor executor =
687      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType,
688        maxQueueLength, priority, conf, abortable));
689    CallRunner task = mock(CallRunner.class);
690    assertFalse(executor.dispatch(task));
691    // make sure we never internally get a handler, which would skip the queue validation
692    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
693      Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
694  }
695
696  @Test
697  public void testMetaRWScanQueues() throws Exception {
698    Configuration schedConf = HBaseConfiguration.create();
699    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
700    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
701    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
702
703    PriorityFunction priority = mock(PriorityFunction.class);
704    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
705
706    RpcScheduler scheduler =
707      new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD);
708    try {
709      scheduler.start();
710
711      CallRunner putCallTask = mock(CallRunner.class);
712      ServerCall putCall = mock(ServerCall.class);
713      putCall.param =
714        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
715      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
716      when(putCallTask.getRpcCall()).thenReturn(putCall);
717      when(putCall.getHeader()).thenReturn(putHead);
718      when(putCall.getParam()).thenReturn(putCall.param);
719
720      CallRunner getCallTask = mock(CallRunner.class);
721      ServerCall getCall = mock(ServerCall.class);
722      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
723      when(getCallTask.getRpcCall()).thenReturn(getCall);
724      when(getCall.getHeader()).thenReturn(getHead);
725
726      CallRunner scanCallTask = mock(CallRunner.class);
727      ServerCall scanCall = mock(ServerCall.class);
728      scanCall.param = ScanRequest.newBuilder().build();
729      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
730      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
731      when(scanCall.getHeader()).thenReturn(scanHead);
732      when(scanCall.getParam()).thenReturn(scanCall.param);
733
734      ArrayList<Integer> work = new ArrayList<>();
735      doAnswerTaskExecution(putCallTask, work, 1, 1000);
736      doAnswerTaskExecution(getCallTask, work, 2, 1000);
737      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
738
739      // There are 3 queues: [puts], [gets], [scans]
740      // so the calls will be interleaved
741      scheduler.dispatch(putCallTask);
742      scheduler.dispatch(putCallTask);
743      scheduler.dispatch(putCallTask);
744      scheduler.dispatch(getCallTask);
745      scheduler.dispatch(getCallTask);
746      scheduler.dispatch(getCallTask);
747      scheduler.dispatch(scanCallTask);
748      scheduler.dispatch(scanCallTask);
749      scheduler.dispatch(scanCallTask);
750
751      while (work.size() < 6) {
752        Thread.sleep(100);
753      }
754
755      for (int i = 0; i < work.size() - 2; i += 3) {
756        assertNotEquals(work.get(i + 0), work.get(i + 1));
757        assertNotEquals(work.get(i + 0), work.get(i + 2));
758        assertNotEquals(work.get(i + 1), work.get(i + 2));
759      }
760    } finally {
761      scheduler.stop();
762    }
763  }
764
765  // Get mocked call that has the CallRunner sleep for a while so that the fast
766  // path isn't hit.
767  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
768    ServerCall putCall = new ServerCall(1, null, null,
769      RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
770      RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
771      null, null, 9, null, timestamp, 0, null, null, null) {
772
773      @Override
774      public void sendResponseIfReady() throws IOException {
775      }
776    };
777
778    return new CallRunner(null, putCall) {
779      @Override
780      public void run() {
781        if (sleepTime <= 0) {
782          return;
783        }
784        try {
785          LOG.warn("Sleeping for " + sleepTime);
786          Thread.sleep(sleepTime);
787          LOG.warn("Done Sleeping for " + sleepTime);
788        } catch (InterruptedException e) {
789        }
790      }
791
792      @Override
793      public RpcCall getRpcCall() {
794        return putCall;
795      }
796
797      @Override
798      public void drop() {
799      }
800    };
801  }
802}