001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.awaitility.Awaitility.await;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotEquals;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.ArgumentMatchers.eq;
028import static org.mockito.Mockito.doAnswer;
029import static org.mockito.Mockito.mock;
030import static org.mockito.Mockito.timeout;
031import static org.mockito.Mockito.verify;
032import static org.mockito.Mockito.when;
033
034import java.io.IOException;
035import java.lang.reflect.Field;
036import java.net.InetSocketAddress;
037import java.util.ArrayList;
038import java.util.Collections;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Map;
042import java.util.Set;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.Abortable;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
052import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.RPCTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdge;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.Threads;
059import org.junit.jupiter.api.BeforeEach;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062import org.junit.jupiter.api.TestInfo;
063import org.mockito.Mockito;
064import org.mockito.invocation.InvocationOnMock;
065import org.mockito.stubbing.Answer;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
072import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
073
074import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
078
079@Tag(RPCTests.TAG)
080@Tag(MediumTests.TAG)
081public class TestSimpleRpcScheduler {
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
084
085  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
086    @Override
087    public InetSocketAddress getListenerAddress() {
088      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
089    }
090  };
091  private Configuration conf;
092  private String testMethodName;
093  private final AtomicInteger requestProcessed = new AtomicInteger(0);
094
095  @BeforeEach
096  public void setUp(TestInfo testInfo) {
097    testMethodName = testInfo.getTestMethod().get().getName();
098    conf = HBaseConfiguration.create();
099  }
100
101  @Test
102  public void testBasic() throws IOException, InterruptedException {
103
104    PriorityFunction qosFunction = mock(PriorityFunction.class);
105    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
106    scheduler.init(CONTEXT);
107    scheduler.start();
108    CallRunner task = createMockTask(HConstants.NORMAL_QOS);
109    task.setStatus(new MonitoredRPCHandlerImpl("test"));
110    scheduler.dispatch(task);
111    verify(task, timeout(10000)).run();
112    scheduler.stop();
113  }
114
115  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
116    try {
117      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
118      ExecutorField.setAccessible(true);
119
120      RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler);
121
122      Field handlerCountField =
123        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
124
125      handlerCountField.setAccessible(true);
126      handlerCountField.set(rpcExecutor, 0);
127
128      Field numCallQueuesField =
129        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
130
131      numCallQueuesField.setAccessible(true);
132      numCallQueuesField.set(rpcExecutor, 1);
133
134      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass()
135        .getDeclaredField("currentQueueLimit");
136
137      currentQueueLimitField.setAccessible(true);
138      currentQueueLimitField.set(rpcExecutor, 100);
139
140    } catch (NoSuchFieldException e) {
141      LOG.error("No such field exception" + e);
142    } catch (IllegalAccessException e) {
143      LOG.error("Illegal access exception" + e);
144    }
145
146    return scheduler;
147  }
148
149  @Test
150  public void testCallQueueInfo() throws IOException, InterruptedException {
151
152    PriorityFunction qosFunction = mock(PriorityFunction.class);
153    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0);
154
155    scheduler.init(CONTEXT);
156
157    // Set the handlers to zero. So that number of requests in call Queue can be tested
158    scheduler = disableHandlers(scheduler);
159    scheduler.start();
160
161    int totalCallMethods = 10;
162    for (int i = totalCallMethods; i > 0; i--) {
163      CallRunner task = createMockTask(HConstants.NORMAL_QOS);
164      task.setStatus(new MonitoredRPCHandlerImpl("test"));
165      scheduler.dispatch(task);
166    }
167
168    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
169
170    for (String callQueueName : callQueueInfo.getCallQueueNames()) {
171
172      for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) {
173        assertEquals(totalCallMethods,
174          callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
175      }
176
177    }
178
179    scheduler.stop();
180
181  }
182
183  @Test
184  public void testHandlerIsolation() throws IOException, InterruptedException {
185    CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
186    CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
187    CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
188    List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
189    Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
190      HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
191    PriorityFunction qosFunction = mock(PriorityFunction.class);
192    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
193    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
194    Answer<Void> answerToRun = new Answer<Void>() {
195      @Override
196      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
197        synchronized (handlerThreads) {
198          handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread());
199        }
200        countDownLatch.countDown();
201        return null;
202      }
203    };
204    for (CallRunner task : tasks) {
205      task.setStatus(new MonitoredRPCHandlerImpl("test"));
206      doAnswer(answerToRun).when(task).run();
207    }
208
209    RpcScheduler scheduler =
210      new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS);
211    scheduler.init(CONTEXT);
212    scheduler.start();
213    for (CallRunner task : tasks) {
214      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
215      scheduler.dispatch(task);
216    }
217    for (CallRunner task : tasks) {
218      verify(task, timeout(10000)).run();
219    }
220    scheduler.stop();
221
222    // Tests that these requests are handled by three distinct threads.
223    countDownLatch.await();
224    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
225  }
226
227  private CallRunner createMockTask(int priority) {
228    ServerCall call = mock(ServerCall.class);
229    CallRunner task = mock(CallRunner.class);
230    RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
231    when(task.getRpcCall()).thenReturn(call);
232    when(call.getHeader()).thenReturn(header);
233    when(call.getReceiveTime()).thenReturn(EnvironmentEdgeManager.currentTime());
234    return task;
235  }
236
237  @Test
238  public void testRpcScheduler() throws Exception {
239    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
240    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
241  }
242
243  @Test
244  public void testPluggableRpcQueue() throws Exception {
245    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
246      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
247
248    try {
249      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass");
250      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
251    } catch (PluggableRpcQueueNotFound e) {
252      // expected
253    } catch (Exception e) {
254      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
255    }
256
257    try {
258      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
259        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
260      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
261    } catch (PluggableRpcQueueNotFound e) {
262      // expected
263    } catch (Exception e) {
264      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
265    }
266  }
267
268  @Test
269  public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
270    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
271    Configuration schedConf = HBaseConfiguration.create();
272    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
273    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
274      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
275    schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
276
277    PriorityFunction priority = mock(PriorityFunction.class);
278    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
279    SimpleRpcScheduler scheduler =
280      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
281
282    Field f = scheduler.getClass().getDeclaredField("callExecutor");
283    f.setAccessible(true);
284    assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
285  }
286
287  @Test
288  public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
289    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
290    Configuration schedConf = HBaseConfiguration.create();
291    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
292    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
293      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
294
295    PriorityFunction priority = mock(PriorityFunction.class);
296    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
297    SimpleRpcScheduler scheduler =
298      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
299
300    Field f = scheduler.getClass().getDeclaredField("callExecutor");
301    f.setAccessible(true);
302    assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
303  }
304
305  @Test
306  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
307
308    Configuration schedConf = HBaseConfiguration.create();
309
310    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
311    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
312    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
313      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
314    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
315      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
316
317    PriorityFunction priority = mock(PriorityFunction.class);
318    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
319    SimpleRpcScheduler scheduler =
320      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
321    try {
322      scheduler.start();
323
324      CallRunner putCallTask = mock(CallRunner.class);
325      ServerCall putCall = mock(ServerCall.class);
326      putCall.param =
327        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
328      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
329      when(putCallTask.getRpcCall()).thenReturn(putCall);
330      when(putCall.getHeader()).thenReturn(putHead);
331
332      assertTrue(scheduler.dispatch(putCallTask));
333
334      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
335      scheduler.onConfigurationChange(schedConf);
336      assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
337      waitUntilQueueEmpty(scheduler);
338    } finally {
339      scheduler.stop();
340    }
341  }
342
343  private void testRpcScheduler(final String queueType) throws Exception {
344    testRpcScheduler(queueType, null);
345  }
346
347  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
348    throws Exception {
349    Configuration schedConf = HBaseConfiguration.create();
350    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
351
352    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
353      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
354    }
355
356    PriorityFunction priority = mock(PriorityFunction.class);
357    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
358
359    RpcScheduler scheduler =
360      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
361    try {
362      scheduler.start();
363
364      CallRunner smallCallTask = mock(CallRunner.class);
365      ServerCall smallCall = mock(ServerCall.class);
366      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
367      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
368      when(smallCall.getHeader()).thenReturn(smallHead);
369
370      CallRunner largeCallTask = mock(CallRunner.class);
371      ServerCall largeCall = mock(ServerCall.class);
372      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
373      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
374      when(largeCall.getHeader()).thenReturn(largeHead);
375
376      CallRunner hugeCallTask = mock(CallRunner.class);
377      ServerCall hugeCall = mock(ServerCall.class);
378      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
379      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
380      when(hugeCall.getHeader()).thenReturn(hugeHead);
381
382      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
383      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
384      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
385
386      final ArrayList<Integer> work = new ArrayList<>();
387      doAnswerTaskExecution(smallCallTask, work, 10, 250);
388      doAnswerTaskExecution(largeCallTask, work, 50, 250);
389      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
390
391      scheduler.dispatch(smallCallTask);
392      scheduler.dispatch(smallCallTask);
393      scheduler.dispatch(smallCallTask);
394      scheduler.dispatch(hugeCallTask);
395      scheduler.dispatch(smallCallTask);
396      scheduler.dispatch(largeCallTask);
397      scheduler.dispatch(smallCallTask);
398      scheduler.dispatch(smallCallTask);
399
400      while (work.size() < 8) {
401        Thread.sleep(100);
402      }
403
404      int seqSum = 0;
405      int totalTime = 0;
406      for (int i = 0; i < work.size(); ++i) {
407        LOG.debug("Request i=" + i + " value=" + work.get(i));
408        seqSum += work.get(i);
409        totalTime += seqSum;
410      }
411      LOG.debug("Total Time: " + totalTime);
412
413      // -> [small small small huge small large small small]
414      // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
415      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
416      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
417        assertEquals(530, totalTime);
418      } else if (
419        queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)
420          || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)
421      ) {
422        assertEquals(930, totalTime);
423      }
424    } finally {
425      scheduler.stop();
426    }
427  }
428
429  @Test
430  public void testScanQueueWithZeroScanRatio() throws Exception {
431
432    Configuration schedConf = HBaseConfiguration.create();
433    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
434    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
435    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
436
437    PriorityFunction priority = mock(PriorityFunction.class);
438    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
439
440    RpcScheduler scheduler =
441      new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD);
442    assertNotEquals(null, scheduler);
443  }
444
445  @Test
446  public void testScanQueues() throws Exception {
447    Configuration schedConf = HBaseConfiguration.create();
448    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
449    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
450    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
451
452    PriorityFunction priority = mock(PriorityFunction.class);
453    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
454
455    RpcScheduler scheduler =
456      new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD);
457    try {
458      scheduler.start();
459
460      CallRunner putCallTask = mock(CallRunner.class);
461      ServerCall putCall = mock(ServerCall.class);
462      putCall.param =
463        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
464      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
465      when(putCallTask.getRpcCall()).thenReturn(putCall);
466      when(putCall.getHeader()).thenReturn(putHead);
467      when(putCall.getParam()).thenReturn(putCall.param);
468
469      CallRunner getCallTask = mock(CallRunner.class);
470      ServerCall getCall = mock(ServerCall.class);
471      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
472      when(getCallTask.getRpcCall()).thenReturn(getCall);
473      when(getCall.getHeader()).thenReturn(getHead);
474
475      CallRunner scanCallTask = mock(CallRunner.class);
476      ServerCall scanCall = mock(ServerCall.class);
477      scanCall.param = ScanRequest.newBuilder().build();
478      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
479      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
480      when(scanCall.getHeader()).thenReturn(scanHead);
481      when(scanCall.getParam()).thenReturn(scanCall.param);
482
483      CallRunner bulkLoadCallTask = mock(CallRunner.class);
484      ServerCall bulkLoadCall = mock(ServerCall.class);
485      bulkLoadCall.param = ScanRequest.newBuilder().build();
486      RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build();
487      when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
488      when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
489      when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
490
491      ArrayList<Integer> work = new ArrayList<>();
492      doAnswerTaskExecution(putCallTask, work, 1, 1000);
493      doAnswerTaskExecution(getCallTask, work, 2, 1000);
494      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
495      doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
496
497      // There are 3 queues: [puts], [gets], [scans], [bulkload]
498      // so the calls will be interleaved
499      scheduler.dispatch(putCallTask);
500      scheduler.dispatch(putCallTask);
501      scheduler.dispatch(putCallTask);
502      scheduler.dispatch(getCallTask);
503      scheduler.dispatch(getCallTask);
504      scheduler.dispatch(getCallTask);
505      scheduler.dispatch(scanCallTask);
506      scheduler.dispatch(scanCallTask);
507      scheduler.dispatch(scanCallTask);
508      scheduler.dispatch(bulkLoadCallTask);
509      scheduler.dispatch(bulkLoadCallTask);
510      scheduler.dispatch(bulkLoadCallTask);
511      while (work.size() < 6) {
512        Thread.sleep(100);
513      }
514
515      for (int i = 0; i < work.size() - 2; i += 3) {
516        assertNotEquals(work.get(i + 0), work.get(i + 1));
517        assertNotEquals(work.get(i + 0), work.get(i + 2));
518        assertNotEquals(work.get(i + 1), work.get(i + 2));
519      }
520    } finally {
521      scheduler.stop();
522    }
523  }
524
525  private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
526    final int value, final int sleepInterval) {
527    callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
528    doAnswer(new Answer<Object>() {
529      @Override
530      public Object answer(InvocationOnMock invocation) {
531        synchronized (results) {
532          results.add(value);
533        }
534        Threads.sleepWithoutInterrupt(sleepInterval);
535        return null;
536      }
537    }).when(callTask).run();
538  }
539
540  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
541    throws InterruptedException {
542    while (scheduler.getGeneralQueueLength() > 0) {
543      Thread.sleep(100);
544    }
545  }
546
547  @Test
548  public void testSoftAndHardQueueLimits() throws Exception {
549
550    Configuration schedConf = HBaseConfiguration.create();
551
552    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
553    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
554    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
555      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
556
557    PriorityFunction priority = mock(PriorityFunction.class);
558    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
559    SimpleRpcScheduler scheduler =
560      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
561    try {
562      scheduler.start();
563
564      CallRunner putCallTask = mock(CallRunner.class);
565      ServerCall putCall = mock(ServerCall.class);
566      putCall.param =
567        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
568      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
569      when(putCallTask.getRpcCall()).thenReturn(putCall);
570      when(putCall.getHeader()).thenReturn(putHead);
571
572      assertTrue(scheduler.dispatch(putCallTask));
573
574      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
575      scheduler.onConfigurationChange(schedConf);
576      assertFalse(scheduler.dispatch(putCallTask));
577      waitUntilQueueEmpty(scheduler);
578      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
579      scheduler.onConfigurationChange(schedConf);
580      assertTrue(scheduler.dispatch(putCallTask));
581    } finally {
582      scheduler.stop();
583    }
584  }
585
586  private final class CoDelEnvironmentEdge implements EnvironmentEdge {
587
588    private long perRequestOffset;
589    private long gapInRequests;
590    private long testStartTime = 0;
591
592    private int requestCount = 0;
593
594    private final Set<String> threadNamePrefixes = new HashSet<>();
595    private final String testThread = Thread.currentThread().getName();
596
597    @Override
598    public long currentTime() {
599      String threadName = Thread.currentThread().getName();
600      if (threadName.equals(testThread)) {
601        // test thread will create a request and add that to scheduler
602        // Replicating a constant rate of request arrival.
603        long requestStartTime = testStartTime + requestCount * gapInRequests;
604        requestCount++;
605        return requestStartTime;
606      }
607      // handler thread will pick the request from queue, this time will depend on processing time
608      // of handler
609      for (String threadNamePrefix : threadNamePrefixes) {
610        if (threadName.startsWith(threadNamePrefix)) {
611          long requestPickTime;
612          if (gapInRequests >= perRequestOffset) {
613            // it means handler can complete the processing of last request and will be free when it
614            // will come to serve next request. We don't need it in fast path but just in case for
615            // other tests
616            requestPickTime = testStartTime + requestProcessed.get() * gapInRequests;
617          } else {
618            // this means handler will still be busy processing the last requests when new request
619            // will come in queue
620            requestPickTime = testStartTime + requestProcessed.get() * perRequestOffset;
621          }
622          return requestPickTime;
623        }
624      }
625      return System.currentTimeMillis();
626    }
627
628    public void startTestFor(int arrivalRatePerSecond, long requestProcessingTime) {
629      this.perRequestOffset = requestProcessingTime;
630      this.gapInRequests = 1000L / arrivalRatePerSecond;
631      this.requestCount = 0;
632      requestProcessed.set(0);
633      this.testStartTime = System.currentTimeMillis();
634    }
635  }
636
637  // This test is fixing the processing time and arrival rate of the request and checking if CoDel
638  // can utilize the maximum capacity of system
639  @Test
640  public void testCoDelScheduling() throws Exception {
641    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
642    envEdge.threadNamePrefixes.add("RpcServer.default.FPBQ.Codel.handler");
643    Configuration schedConf = HBaseConfiguration.create();
644    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
645    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
646      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
647    schedConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 5);
648    PriorityFunction priority = mock(PriorityFunction.class);
649    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
650    SimpleRpcScheduler scheduler =
651      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
652
653    try {
654      scheduler.start();
655      EnvironmentEdgeManager.injectEdge(envEdge);
656
657      // incoming traffic < capacity
658      envEdge.startTestFor(20, 40);
659      for (int i = 0; i < 100; i++) {
660        long time = EnvironmentEdgeManager.currentTime();
661        CallRunner cr = getMockedCallRunner(time, 2);
662        scheduler.dispatch(cr);
663        Thread.sleep(50);
664      }
665      // LOG.info("Loop done");
666      // make sure fast calls are handled
667      waitUntilQueueEmpty(scheduler);
668      Thread.sleep(100);
669      assertEquals(0, scheduler.getNumGeneralCallsDropped(),
670        "None of these calls should have been discarded");
671
672      // incoming traffic = capacity
673      envEdge.startTestFor(20, 50);
674      for (int i = 0; i < 20; i++) {
675        long time = EnvironmentEdgeManager.currentTime();
676        CallRunner cr = getMockedCallRunner(time, 2);
677        scheduler.dispatch(cr);
678        // We have mocked the arrival time and the time request get picked from queue but we need to
679        // also make sure that queue fill at the arrival rate
680        Thread.sleep(50);
681      }
682
683      // make sure somewhat slow calls are handled
684      waitUntilQueueEmpty(scheduler);
685      Thread.sleep(100);
686      assertEquals(0, scheduler.getNumGeneralCallsDropped(),
687        "None of these calls should have been discarded");
688
689      // incoming traffic > capacity
690      envEdge.startTestFor(20, 60);
691      // now slow calls and the ones to be dropped
692      for (int i = 0; i < 60; i++) {
693        long time = EnvironmentEdgeManager.currentTime();
694        CallRunner cr = getMockedCallRunner(time, 100);
695        scheduler.dispatch(cr);
696        Thread.sleep(50);
697      }
698
699      // make sure somewhat slow calls are handled
700      waitUntilQueueEmpty(scheduler);
701      Thread.sleep(100);
702      // 60 calls will take 3 second with 20 rps. And in 3 second with 60 processing time we can
703      // only serve 50 request
704      assertTrue(requestProcessed.get() >= 48,
705        "Number of processed requests should be greater then 90% of capacity"
706          + requestProcessed.get());
707      assertTrue(scheduler.getNumGeneralCallsDropped() >= 9,
708        "There should have been at least 9 calls dropped however there were "
709          + scheduler.getNumGeneralCallsDropped());
710
711    } finally {
712      scheduler.stop();
713    }
714  }
715
716  @Test
717  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
718    String name = testMethodName;
719    int handlerCount = 1;
720    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
721    int maxQueueLength = 0;
722    PriorityFunction priority = mock(PriorityFunction.class);
723    Configuration conf = HBaseConfiguration.create();
724    Abortable abortable = mock(Abortable.class);
725    FastPathBalancedQueueRpcExecutor executor =
726      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType,
727        maxQueueLength, priority, conf, abortable));
728    CallRunner task = mock(CallRunner.class);
729    assertFalse(executor.dispatch(task));
730    // make sure we never internally get a handler, which would skip the queue validation
731    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
732      Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
733  }
734
735  @Test
736  public void testMetaRWScanQueues() throws Exception {
737    Configuration schedConf = HBaseConfiguration.create();
738    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
739    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
740    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
741
742    PriorityFunction priority = mock(PriorityFunction.class);
743    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
744
745    RpcScheduler scheduler =
746      new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD);
747    try {
748      scheduler.start();
749
750      CallRunner putCallTask = mock(CallRunner.class);
751      ServerCall putCall = mock(ServerCall.class);
752      putCall.param =
753        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
754      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
755      when(putCallTask.getRpcCall()).thenReturn(putCall);
756      when(putCall.getHeader()).thenReturn(putHead);
757      when(putCall.getParam()).thenReturn(putCall.param);
758
759      CallRunner clientReadCallTask = mock(CallRunner.class);
760      ServerCall clientReadCall = mock(ServerCall.class);
761      RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
762      when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
763      when(clientReadCall.getHeader()).thenReturn(clientReadHead);
764
765      CallRunner internalReadCallTask = mock(CallRunner.class);
766      ServerCall internalReadCall = mock(ServerCall.class);
767      internalReadCall.param = ScanRequest.newBuilder().build();
768      RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan")
769        .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build();
770      when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
771      when(internalReadCall.getHeader()).thenReturn(masterReadHead);
772      when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
773
774      ArrayList<Integer> work = new ArrayList<>();
775      doAnswerTaskExecution(putCallTask, work, 1, 1000);
776      doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
777      doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
778
779      // There are 3 queues: [puts], [gets], [scans]
780      // so the calls will be interleaved
781      scheduler.dispatch(putCallTask);
782      scheduler.dispatch(putCallTask);
783      scheduler.dispatch(putCallTask);
784      scheduler.dispatch(clientReadCallTask);
785      scheduler.dispatch(clientReadCallTask);
786      scheduler.dispatch(clientReadCallTask);
787      scheduler.dispatch(internalReadCallTask);
788      scheduler.dispatch(internalReadCallTask);
789      scheduler.dispatch(internalReadCallTask);
790
791      while (work.size() < 6) {
792        Thread.sleep(100);
793      }
794
795      for (int i = 0; i < work.size() - 2; i += 3) {
796        assertNotEquals(work.get(i + 0), work.get(i + 1));
797        assertNotEquals(work.get(i + 0), work.get(i + 2));
798        assertNotEquals(work.get(i + 1), work.get(i + 2));
799      }
800    } finally {
801      scheduler.stop();
802    }
803  }
804
805  // Get mocked call that has the CallRunner sleep for a while so that the fast
806  // path isn't hit.
807  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
808    ServerCall putCall = new ServerCall(1, null, null,
809      RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
810      RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
811      null, null, 9, null, timestamp, 0, null, null, null) {
812
813      @Override
814      public void sendResponseIfReady() throws IOException {
815      }
816    };
817
818    return new CallRunner(null, putCall) {
819      @Override
820      public void run() {
821        if (sleepTime <= 0) {
822          return;
823        }
824        try {
825          Thread.sleep(sleepTime);
826          requestProcessed.incrementAndGet();
827        } catch (InterruptedException e) {
828        }
829      }
830
831      @Override
832      public RpcCall getRpcCall() {
833        return putCall;
834      }
835
836      @Override
837      public void drop() {
838      }
839    };
840  }
841
842  /**
843   * Test LIFO switching behavior through actual RPC calls. This test verifies that when the queue
844   * fills beyond the LIFO threshold, newer calls are processed before older calls (LIFO mode).
845   */
846  @Test
847  public void testCoDelLifoWithRpcCalls() throws Exception {
848    Configuration testConf = HBaseConfiguration.create();
849    testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
850      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
851    int maxCallQueueLength = 50;
852    double coDelLifoThreshold = 0.8;
853    testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, maxCallQueueLength);
854    testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, coDelLifoThreshold);
855    testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 100);
856    testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_INTERVAL, 100);
857    testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); // Single handler to control
858                                                                // processing
859
860    PriorityFunction priority = mock(PriorityFunction.class);
861    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
862    SimpleRpcScheduler scheduler =
863      new SimpleRpcScheduler(testConf, 1, 0, 0, priority, HConstants.QOS_THRESHOLD);
864
865    try {
866      scheduler.init(CONTEXT);
867      scheduler.start();
868
869      // Track completion order
870      final List<Integer> completedCalls = Collections.synchronizedList(new ArrayList<>());
871
872      // Dispatch many slow calls rapidly to fill the queue beyond 80% threshold
873      // With queue limit of 50, we need > 40 calls to cross 80%
874      int numCalls = 48;
875      for (int i = 0; i < numCalls; i++) {
876        final int callId = i;
877        CallRunner call = createMockTask(HConstants.NORMAL_QOS);
878        call.setStatus(new MonitoredRPCHandlerImpl("test"));
879        doAnswer(invocation -> {
880          completedCalls.add(callId);
881          Thread.sleep(100); // Slow processing to allow queue to build up
882          return null;
883        }).when(call).run();
884        scheduler.dispatch(call);
885        // No delay between dispatches - rapidly fill the queue
886      }
887
888      // Wait for some calls to complete
889      await().atMost(2, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 3);
890
891      // Check that we had LIFO switches
892      long lifoSwitches = scheduler.getNumLifoModeSwitches();
893      assertTrue(lifoSwitches > 0,
894        "Should have switched to LIFO mode at least once, but got: " + lifoSwitches);
895
896      // Verify LIFO behavior: Among first completed calls, we should see higher call IDs
897      // (indicating later dispatched calls completed first)
898      int maxCallIdCompleted = -1;
899      for (int i = 0; i < completedCalls.size(); i++) {
900        maxCallIdCompleted = Math.max(maxCallIdCompleted, completedCalls.get(i));
901      }
902      // At least one of the early completed calls should have a high ID (>20)
903      // indicating LIFO processing
904      assertTrue(maxCallIdCompleted > maxCallQueueLength * coDelLifoThreshold,
905        "Expected LIFO behavior: early completed calls should include call arrived after threshold "
906          + "maxCallIdCompleted: " + maxCallIdCompleted);
907
908    } finally {
909      scheduler.stop();
910    }
911  }
912
913  /**
914   * Test that CoDel queue returns to FIFO mode after draining below threshold.
915   */
916  @Test
917  public void testCoDelQueueDrainAndFifoReturn() throws Exception {
918    Configuration testConf = HBaseConfiguration.create();
919    testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
920      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
921    testConf.setLong(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 100);
922    testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 50);
923    testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8);
924    testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
925
926    PriorityFunction priority = mock(PriorityFunction.class);
927    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
928    SimpleRpcScheduler scheduler =
929      new SimpleRpcScheduler(testConf, 2, 0, 0, priority, HConstants.QOS_THRESHOLD);
930
931    try {
932      scheduler.init(CONTEXT);
933      scheduler.start();
934
935      final List<Integer> completedCalls = Collections.synchronizedList(new ArrayList<>());
936
937      // Fill queue rapidly to trigger LIFO (>40 calls for 80% of 50)
938      for (int i = 0; i < 48; i++) {
939        final int callId = i;
940        CallRunner call = createMockTask(HConstants.NORMAL_QOS);
941        call.setStatus(new MonitoredRPCHandlerImpl("test"));
942        doAnswer(invocation -> {
943          completedCalls.add(callId);
944          Thread.sleep(80);
945          return null;
946        }).when(call).run();
947        scheduler.dispatch(call);
948      }
949
950      // Wait for calls to complete
951      await().atMost(1, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 3);
952      assertTrue(scheduler.getNumLifoModeSwitches() > 0, "Should have entered LIFO mode");
953
954      await().atMost(2, TimeUnit.SECONDS).until(
955        () -> scheduler.getGeneralQueueLength() == 0 && scheduler.getActiveRpcHandlerCount() == 0);
956
957      long pastNumLifoModeSwitches = scheduler.getNumLifoModeSwitches();
958      // Send new calls - should process in FIFO order
959      completedCalls.clear();
960      for (int i = 100; i < 105; i++) {
961        final int callId = i;
962        CallRunner call = createMockTask(HConstants.NORMAL_QOS);
963        call.setStatus(new MonitoredRPCHandlerImpl("test"));
964        doAnswer(invocation -> {
965          completedCalls.add(callId);
966          Thread.sleep(50);
967          return null;
968        }).when(call).run();
969        scheduler.dispatch(call);
970      }
971
972      // Wait for these calls to complete
973      await().atMost(2, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2);
974
975      long newLifoSwitch = scheduler.getNumLifoModeSwitches() - pastNumLifoModeSwitches;
976      // Allow at most 1 violation due to concurrent execution by 2 handlers
977      assertEquals(0, newLifoSwitch,
978        "Queue should not switch to LIFO last 5 calls but number of LIFO switch are : "
979          + newLifoSwitch);
980    } finally {
981      scheduler.stop();
982    }
983  }
984
985}