001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.timeout;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.LinkedBlockingQueue;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.Abortable;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RPCTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdge;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.Threads;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.mockito.Mockito;
064import org.mockito.invocation.InvocationOnMock;
065import org.mockito.stubbing.Answer;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
072import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
073
074import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
078
079@Category({ RPCTests.class, MediumTests.class })
080public class TestSimpleRpcScheduler {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
085
086  @Rule
087  public TestName testName = new TestName();
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
090
091  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
092    @Override
093    public InetSocketAddress getListenerAddress() {
094      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
095    }
096  };
097  private Configuration conf;
098
099  @Before
100  public void setUp() {
101    conf = HBaseConfiguration.create();
102  }
103
104  @Test
105  public void testBasic() throws IOException, InterruptedException {
106
107    PriorityFunction qosFunction = mock(PriorityFunction.class);
108    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
109    scheduler.init(CONTEXT);
110    scheduler.start();
111    CallRunner task = createMockTask();
112    task.setStatus(new MonitoredRPCHandlerImpl("test"));
113    scheduler.dispatch(task);
114    verify(task, timeout(10000)).run();
115    scheduler.stop();
116  }
117
118  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
119    try {
120      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
121      ExecutorField.setAccessible(true);
122
123      RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler);
124
125      Field handlerCountField =
126        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
127
128      handlerCountField.setAccessible(true);
129      handlerCountField.set(rpcExecutor, 0);
130
131      Field numCallQueuesField =
132        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
133
134      numCallQueuesField.setAccessible(true);
135      numCallQueuesField.set(rpcExecutor, 1);
136
137      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass()
138        .getDeclaredField("currentQueueLimit");
139
140      currentQueueLimitField.setAccessible(true);
141      currentQueueLimitField.set(rpcExecutor, 100);
142
143    } catch (NoSuchFieldException e) {
144      LOG.error("No such field exception" + e);
145    } catch (IllegalAccessException e) {
146      LOG.error("Illegal access exception" + e);
147    }
148
149    return scheduler;
150  }
151
152  @Test
153  public void testCallQueueInfo() throws IOException, InterruptedException {
154
155    PriorityFunction qosFunction = mock(PriorityFunction.class);
156    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0);
157
158    scheduler.init(CONTEXT);
159
160    // Set the handlers to zero. So that number of requests in call Queue can be tested
161    scheduler = disableHandlers(scheduler);
162    scheduler.start();
163
164    int totalCallMethods = 10;
165    for (int i = totalCallMethods; i > 0; i--) {
166      CallRunner task = createMockTask();
167      task.setStatus(new MonitoredRPCHandlerImpl("test"));
168      scheduler.dispatch(task);
169    }
170
171    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
172
173    for (String callQueueName : callQueueInfo.getCallQueueNames()) {
174
175      for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) {
176        assertEquals(totalCallMethods,
177          callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
178      }
179
180    }
181
182    scheduler.stop();
183
184  }
185
186  @Test
187  public void testHandlerIsolation() throws IOException, InterruptedException {
188    CallRunner generalTask = createMockTask();
189    CallRunner priorityTask = createMockTask();
190    CallRunner replicationTask = createMockTask();
191    List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
192    Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
193      HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
194    PriorityFunction qosFunction = mock(PriorityFunction.class);
195    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
196    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
197    Answer<Void> answerToRun = new Answer<Void>() {
198      @Override
199      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
200        synchronized (handlerThreads) {
201          handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread());
202        }
203        countDownLatch.countDown();
204        return null;
205      }
206    };
207    for (CallRunner task : tasks) {
208      task.setStatus(new MonitoredRPCHandlerImpl("test"));
209      doAnswer(answerToRun).when(task).run();
210    }
211
212    RpcScheduler scheduler =
213      new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS);
214    scheduler.init(CONTEXT);
215    scheduler.start();
216    for (CallRunner task : tasks) {
217      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
218      scheduler.dispatch(task);
219    }
220    for (CallRunner task : tasks) {
221      verify(task, timeout(10000)).run();
222    }
223    scheduler.stop();
224
225    // Tests that these requests are handled by three distinct threads.
226    countDownLatch.await();
227    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
228  }
229
230  private CallRunner createMockTask() {
231    ServerCall call = mock(ServerCall.class);
232    CallRunner task = mock(CallRunner.class);
233    when(task.getRpcCall()).thenReturn(call);
234    return task;
235  }
236
237  @Test
238  public void testRpcScheduler() throws Exception {
239    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
240    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
241  }
242
243  @Test
244  public void testPluggableRpcQueue() throws Exception {
245    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
246      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
247
248    try {
249      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass");
250      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
251    } catch (PluggableRpcQueueNotFound e) {
252      // expected
253    } catch (Exception e) {
254      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
255    }
256
257    try {
258      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
259        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
260      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
261    } catch (PluggableRpcQueueNotFound e) {
262      // expected
263    } catch (Exception e) {
264      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
265    }
266  }
267
268  @Test
269  public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
270    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
271    Configuration schedConf = HBaseConfiguration.create();
272    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
273    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
274      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
275    schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
276
277    PriorityFunction priority = mock(PriorityFunction.class);
278    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
279    SimpleRpcScheduler scheduler =
280      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
281
282    Field f = scheduler.getClass().getDeclaredField("callExecutor");
283    f.setAccessible(true);
284    assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
285  }
286
287  @Test
288  public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
289    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
290    Configuration schedConf = HBaseConfiguration.create();
291    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
292    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
293      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
294
295    PriorityFunction priority = mock(PriorityFunction.class);
296    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
297    SimpleRpcScheduler scheduler =
298      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
299
300    Field f = scheduler.getClass().getDeclaredField("callExecutor");
301    f.setAccessible(true);
302    assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
303  }
304
305  @Test
306  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
307
308    Configuration schedConf = HBaseConfiguration.create();
309
310    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
311    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
312    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
313      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
314    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
315      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
316
317    PriorityFunction priority = mock(PriorityFunction.class);
318    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
319    SimpleRpcScheduler scheduler =
320      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
321    try {
322      scheduler.start();
323
324      CallRunner putCallTask = mock(CallRunner.class);
325      ServerCall putCall = mock(ServerCall.class);
326      putCall.param =
327        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
328      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
329      when(putCallTask.getRpcCall()).thenReturn(putCall);
330      when(putCall.getHeader()).thenReturn(putHead);
331
332      assertTrue(scheduler.dispatch(putCallTask));
333
334      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
335      scheduler.onConfigurationChange(schedConf);
336      assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
337      waitUntilQueueEmpty(scheduler);
338    } finally {
339      scheduler.stop();
340    }
341  }
342
343  private void testRpcScheduler(final String queueType) throws Exception {
344    testRpcScheduler(queueType, null);
345  }
346
347  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
348    throws Exception {
349    Configuration schedConf = HBaseConfiguration.create();
350    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
351
352    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
353      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
354    }
355
356    PriorityFunction priority = mock(PriorityFunction.class);
357    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
358
359    RpcScheduler scheduler =
360      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
361    try {
362      scheduler.start();
363
364      CallRunner smallCallTask = mock(CallRunner.class);
365      ServerCall smallCall = mock(ServerCall.class);
366      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
367      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
368      when(smallCall.getHeader()).thenReturn(smallHead);
369
370      CallRunner largeCallTask = mock(CallRunner.class);
371      ServerCall largeCall = mock(ServerCall.class);
372      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
373      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
374      when(largeCall.getHeader()).thenReturn(largeHead);
375
376      CallRunner hugeCallTask = mock(CallRunner.class);
377      ServerCall hugeCall = mock(ServerCall.class);
378      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
379      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
380      when(hugeCall.getHeader()).thenReturn(hugeHead);
381
382      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
383      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
384      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
385
386      final ArrayList<Integer> work = new ArrayList<>();
387      doAnswerTaskExecution(smallCallTask, work, 10, 250);
388      doAnswerTaskExecution(largeCallTask, work, 50, 250);
389      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
390
391      scheduler.dispatch(smallCallTask);
392      scheduler.dispatch(smallCallTask);
393      scheduler.dispatch(smallCallTask);
394      scheduler.dispatch(hugeCallTask);
395      scheduler.dispatch(smallCallTask);
396      scheduler.dispatch(largeCallTask);
397      scheduler.dispatch(smallCallTask);
398      scheduler.dispatch(smallCallTask);
399
400      while (work.size() < 8) {
401        Thread.sleep(100);
402      }
403
404      int seqSum = 0;
405      int totalTime = 0;
406      for (int i = 0; i < work.size(); ++i) {
407        LOG.debug("Request i=" + i + " value=" + work.get(i));
408        seqSum += work.get(i);
409        totalTime += seqSum;
410      }
411      LOG.debug("Total Time: " + totalTime);
412
413      // -> [small small small huge small large small small]
414      // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
415      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
416      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
417        assertEquals(530, totalTime);
418      } else if (
419        queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)
420          || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)
421      ) {
422        assertEquals(930, totalTime);
423      }
424    } finally {
425      scheduler.stop();
426    }
427  }
428
429  @Test
430  public void testScanQueueWithZeroScanRatio() throws Exception {
431
432    Configuration schedConf = HBaseConfiguration.create();
433    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
434    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
435    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
436
437    PriorityFunction priority = mock(PriorityFunction.class);
438    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
439
440    RpcScheduler scheduler =
441      new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD);
442    assertNotEquals(null, scheduler);
443  }
444
445  @Test
446  public void testScanQueues() throws Exception {
447    Configuration schedConf = HBaseConfiguration.create();
448    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
449    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
450    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
451
452    PriorityFunction priority = mock(PriorityFunction.class);
453    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
454
455    RpcScheduler scheduler =
456      new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD);
457    try {
458      scheduler.start();
459
460      CallRunner putCallTask = mock(CallRunner.class);
461      ServerCall putCall = mock(ServerCall.class);
462      putCall.param =
463        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
464      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
465      when(putCallTask.getRpcCall()).thenReturn(putCall);
466      when(putCall.getHeader()).thenReturn(putHead);
467      when(putCall.getParam()).thenReturn(putCall.param);
468
469      CallRunner getCallTask = mock(CallRunner.class);
470      ServerCall getCall = mock(ServerCall.class);
471      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
472      when(getCallTask.getRpcCall()).thenReturn(getCall);
473      when(getCall.getHeader()).thenReturn(getHead);
474
475      CallRunner scanCallTask = mock(CallRunner.class);
476      ServerCall scanCall = mock(ServerCall.class);
477      scanCall.param = ScanRequest.newBuilder().build();
478      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
479      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
480      when(scanCall.getHeader()).thenReturn(scanHead);
481      when(scanCall.getParam()).thenReturn(scanCall.param);
482
483      CallRunner bulkLoadCallTask = mock(CallRunner.class);
484      ServerCall bulkLoadCall = mock(ServerCall.class);
485      bulkLoadCall.param = ScanRequest.newBuilder().build();
486      RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build();
487      when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
488      when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
489      when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
490
491      ArrayList<Integer> work = new ArrayList<>();
492      doAnswerTaskExecution(putCallTask, work, 1, 1000);
493      doAnswerTaskExecution(getCallTask, work, 2, 1000);
494      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
495      doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
496
497      // There are 3 queues: [puts], [gets], [scans], [bulkload]
498      // so the calls will be interleaved
499      scheduler.dispatch(putCallTask);
500      scheduler.dispatch(putCallTask);
501      scheduler.dispatch(putCallTask);
502      scheduler.dispatch(getCallTask);
503      scheduler.dispatch(getCallTask);
504      scheduler.dispatch(getCallTask);
505      scheduler.dispatch(scanCallTask);
506      scheduler.dispatch(scanCallTask);
507      scheduler.dispatch(scanCallTask);
508      scheduler.dispatch(bulkLoadCallTask);
509      scheduler.dispatch(bulkLoadCallTask);
510      scheduler.dispatch(bulkLoadCallTask);
511      while (work.size() < 6) {
512        Thread.sleep(100);
513      }
514
515      for (int i = 0; i < work.size() - 2; i += 3) {
516        assertNotEquals(work.get(i + 0), work.get(i + 1));
517        assertNotEquals(work.get(i + 0), work.get(i + 2));
518        assertNotEquals(work.get(i + 1), work.get(i + 2));
519      }
520    } finally {
521      scheduler.stop();
522    }
523  }
524
525  private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
526    final int value, final int sleepInterval) {
527    callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
528    doAnswer(new Answer<Object>() {
529      @Override
530      public Object answer(InvocationOnMock invocation) {
531        synchronized (results) {
532          results.add(value);
533        }
534        Threads.sleepWithoutInterrupt(sleepInterval);
535        return null;
536      }
537    }).when(callTask).run();
538  }
539
540  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
541    throws InterruptedException {
542    while (scheduler.getGeneralQueueLength() > 0) {
543      Thread.sleep(100);
544    }
545  }
546
547  @Test
548  public void testSoftAndHardQueueLimits() throws Exception {
549
550    Configuration schedConf = HBaseConfiguration.create();
551
552    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
553    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
554    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
555      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
556
557    PriorityFunction priority = mock(PriorityFunction.class);
558    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
559    SimpleRpcScheduler scheduler =
560      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
561    try {
562      scheduler.start();
563
564      CallRunner putCallTask = mock(CallRunner.class);
565      ServerCall putCall = mock(ServerCall.class);
566      putCall.param =
567        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
568      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
569      when(putCallTask.getRpcCall()).thenReturn(putCall);
570      when(putCall.getHeader()).thenReturn(putHead);
571
572      assertTrue(scheduler.dispatch(putCallTask));
573
574      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
575      scheduler.onConfigurationChange(schedConf);
576      assertFalse(scheduler.dispatch(putCallTask));
577      waitUntilQueueEmpty(scheduler);
578      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
579      scheduler.onConfigurationChange(schedConf);
580      assertTrue(scheduler.dispatch(putCallTask));
581    } finally {
582      scheduler.stop();
583    }
584  }
585
586  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
587
588    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
589
590    private long offset;
591
592    private final Set<String> threadNamePrefixs = new HashSet<>();
593
594    @Override
595    public long currentTime() {
596      for (String threadNamePrefix : threadNamePrefixs) {
597        String threadName = Thread.currentThread().getName();
598        if (threadName.startsWith(threadNamePrefix)) {
599          if (timeQ != null) {
600            Long qTime = timeQ.poll();
601            if (qTime != null) {
602              return qTime.longValue() + offset;
603            }
604          }
605        }
606      }
607      return System.currentTimeMillis();
608    }
609  }
610
611  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
612  // from the get go. So we are always overloaded. The test below would seem to complete the
613  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
614  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
615  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
616  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
617  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
618  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
619  // timer-based. Disabling for now till chat with authors.
620  @Test
621  public void testCoDelScheduling() throws Exception {
622    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
623    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
624    Configuration schedConf = HBaseConfiguration.create();
625    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
626    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
627      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
628    PriorityFunction priority = mock(PriorityFunction.class);
629    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
630    SimpleRpcScheduler scheduler =
631      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
632    try {
633      // Loading mocked call runner can take a good amount of time the first time through
634      // (haven't looked why). Load it for first time here outside of the timed loop.
635      getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2);
636      scheduler.start();
637      EnvironmentEdgeManager.injectEdge(envEdge);
638      envEdge.offset = 5;
639      // Calls faster than min delay
640      // LOG.info("Start");
641      for (int i = 0; i < 100; i++) {
642        long time = EnvironmentEdgeManager.currentTime();
643        envEdge.timeQ.put(time);
644        CallRunner cr = getMockedCallRunner(time, 2);
645        scheduler.dispatch(cr);
646      }
647      // LOG.info("Loop done");
648      // make sure fast calls are handled
649      waitUntilQueueEmpty(scheduler);
650      Thread.sleep(100);
651      assertEquals("None of these calls should have been discarded", 0,
652        scheduler.getNumGeneralCallsDropped());
653
654      envEdge.offset = 151;
655      // calls slower than min delay, but not individually slow enough to be dropped
656      for (int i = 0; i < 20; i++) {
657        long time = EnvironmentEdgeManager.currentTime();
658        envEdge.timeQ.put(time);
659        CallRunner cr = getMockedCallRunner(time, 2);
660        scheduler.dispatch(cr);
661      }
662
663      // make sure somewhat slow calls are handled
664      waitUntilQueueEmpty(scheduler);
665      Thread.sleep(100);
666      assertEquals("None of these calls should have been discarded", 0,
667        scheduler.getNumGeneralCallsDropped());
668
669      envEdge.offset = 2000;
670      // now slow calls and the ones to be dropped
671      for (int i = 0; i < 60; i++) {
672        long time = EnvironmentEdgeManager.currentTime();
673        envEdge.timeQ.put(time);
674        CallRunner cr = getMockedCallRunner(time, 100);
675        scheduler.dispatch(cr);
676      }
677
678      // make sure somewhat slow calls are handled
679      waitUntilQueueEmpty(scheduler);
680      Thread.sleep(100);
681      assertTrue("There should have been at least 12 calls dropped however there were "
682        + scheduler.getNumGeneralCallsDropped(), scheduler.getNumGeneralCallsDropped() > 12);
683    } finally {
684      scheduler.stop();
685    }
686  }
687
688  @Test
689  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
690    String name = testName.getMethodName();
691    int handlerCount = 1;
692    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
693    int maxQueueLength = 0;
694    PriorityFunction priority = mock(PriorityFunction.class);
695    Configuration conf = HBaseConfiguration.create();
696    Abortable abortable = mock(Abortable.class);
697    FastPathBalancedQueueRpcExecutor executor =
698      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType,
699        maxQueueLength, priority, conf, abortable));
700    CallRunner task = mock(CallRunner.class);
701    assertFalse(executor.dispatch(task));
702    // make sure we never internally get a handler, which would skip the queue validation
703    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
704      Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
705  }
706
707  @Test
708  public void testMetaRWScanQueues() throws Exception {
709    Configuration schedConf = HBaseConfiguration.create();
710    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
711    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
712    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
713
714    PriorityFunction priority = mock(PriorityFunction.class);
715    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
716
717    RpcScheduler scheduler =
718      new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD);
719    try {
720      scheduler.start();
721
722      CallRunner putCallTask = mock(CallRunner.class);
723      ServerCall putCall = mock(ServerCall.class);
724      putCall.param =
725        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
726      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
727      when(putCallTask.getRpcCall()).thenReturn(putCall);
728      when(putCall.getHeader()).thenReturn(putHead);
729      when(putCall.getParam()).thenReturn(putCall.param);
730
731      CallRunner getCallTask = mock(CallRunner.class);
732      ServerCall getCall = mock(ServerCall.class);
733      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
734      when(getCallTask.getRpcCall()).thenReturn(getCall);
735      when(getCall.getHeader()).thenReturn(getHead);
736
737      CallRunner scanCallTask = mock(CallRunner.class);
738      ServerCall scanCall = mock(ServerCall.class);
739      scanCall.param = ScanRequest.newBuilder().build();
740      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
741      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
742      when(scanCall.getHeader()).thenReturn(scanHead);
743      when(scanCall.getParam()).thenReturn(scanCall.param);
744
745      ArrayList<Integer> work = new ArrayList<>();
746      doAnswerTaskExecution(putCallTask, work, 1, 1000);
747      doAnswerTaskExecution(getCallTask, work, 2, 1000);
748      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
749
750      // There are 3 queues: [puts], [gets], [scans]
751      // so the calls will be interleaved
752      scheduler.dispatch(putCallTask);
753      scheduler.dispatch(putCallTask);
754      scheduler.dispatch(putCallTask);
755      scheduler.dispatch(getCallTask);
756      scheduler.dispatch(getCallTask);
757      scheduler.dispatch(getCallTask);
758      scheduler.dispatch(scanCallTask);
759      scheduler.dispatch(scanCallTask);
760      scheduler.dispatch(scanCallTask);
761
762      while (work.size() < 6) {
763        Thread.sleep(100);
764      }
765
766      for (int i = 0; i < work.size() - 2; i += 3) {
767        assertNotEquals(work.get(i + 0), work.get(i + 1));
768        assertNotEquals(work.get(i + 0), work.get(i + 2));
769        assertNotEquals(work.get(i + 1), work.get(i + 2));
770      }
771    } finally {
772      scheduler.stop();
773    }
774  }
775
776  // Get mocked call that has the CallRunner sleep for a while so that the fast
777  // path isn't hit.
778  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
779    ServerCall putCall = new ServerCall(1, null, null,
780      RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
781      RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
782      null, null, 9, null, timestamp, 0, null, null, null) {
783
784      @Override
785      public void sendResponseIfReady() throws IOException {
786      }
787    };
788
789    return new CallRunner(null, putCall) {
790      @Override
791      public void run() {
792        if (sleepTime <= 0) {
793          return;
794        }
795        try {
796          LOG.warn("Sleeping for " + sleepTime);
797          Thread.sleep(sleepTime);
798          LOG.warn("Done Sleeping for " + sleepTime);
799        } catch (InterruptedException e) {
800        }
801      }
802
803      @Override
804      public RpcCall getRpcCall() {
805        return putCall;
806      }
807
808      @Override
809      public void drop() {
810      }
811    };
812  }
813}