001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.timeout;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.LinkedBlockingQueue;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.Abortable;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
050import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RPCTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdge;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.Threads;
057import org.junit.jupiter.api.BeforeEach;
058import org.junit.jupiter.api.Tag;
059import org.junit.jupiter.api.Test;
060import org.junit.jupiter.api.TestInfo;
061import org.mockito.Mockito;
062import org.mockito.invocation.InvocationOnMock;
063import org.mockito.stubbing.Answer;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
070import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
071
072import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
076
077@Tag(RPCTests.TAG)
078@Tag(MediumTests.TAG)
079public class TestSimpleRpcScheduler {
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
082
083  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
084    @Override
085    public InetSocketAddress getListenerAddress() {
086      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
087    }
088  };
089  private Configuration conf;
090  private String testMethodName;
091
092  @BeforeEach
093  public void setUp(TestInfo testInfo) {
094    testMethodName = testInfo.getTestMethod().get().getName();
095    conf = HBaseConfiguration.create();
096  }
097
098  @Test
099  public void testBasic() throws IOException, InterruptedException {
100
101    PriorityFunction qosFunction = mock(PriorityFunction.class);
102    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
103    scheduler.init(CONTEXT);
104    scheduler.start();
105    CallRunner task = createMockTask(HConstants.NORMAL_QOS);
106    task.setStatus(new MonitoredRPCHandlerImpl("test"));
107    scheduler.dispatch(task);
108    verify(task, timeout(10000)).run();
109    scheduler.stop();
110  }
111
112  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
113    try {
114      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
115      ExecutorField.setAccessible(true);
116
117      RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler);
118
119      Field handlerCountField =
120        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
121
122      handlerCountField.setAccessible(true);
123      handlerCountField.set(rpcExecutor, 0);
124
125      Field numCallQueuesField =
126        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
127
128      numCallQueuesField.setAccessible(true);
129      numCallQueuesField.set(rpcExecutor, 1);
130
131      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass()
132        .getDeclaredField("currentQueueLimit");
133
134      currentQueueLimitField.setAccessible(true);
135      currentQueueLimitField.set(rpcExecutor, 100);
136
137    } catch (NoSuchFieldException e) {
138      LOG.error("No such field exception" + e);
139    } catch (IllegalAccessException e) {
140      LOG.error("Illegal access exception" + e);
141    }
142
143    return scheduler;
144  }
145
146  @Test
147  public void testCallQueueInfo() throws IOException, InterruptedException {
148
149    PriorityFunction qosFunction = mock(PriorityFunction.class);
150    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0);
151
152    scheduler.init(CONTEXT);
153
154    // Set the handlers to zero. So that number of requests in call Queue can be tested
155    scheduler = disableHandlers(scheduler);
156    scheduler.start();
157
158    int totalCallMethods = 10;
159    for (int i = totalCallMethods; i > 0; i--) {
160      CallRunner task = createMockTask(HConstants.NORMAL_QOS);
161      task.setStatus(new MonitoredRPCHandlerImpl("test"));
162      scheduler.dispatch(task);
163    }
164
165    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
166
167    for (String callQueueName : callQueueInfo.getCallQueueNames()) {
168
169      for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) {
170        assertEquals(totalCallMethods,
171          callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
172      }
173
174    }
175
176    scheduler.stop();
177
178  }
179
180  @Test
181  public void testHandlerIsolation() throws IOException, InterruptedException {
182    CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
183    CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
184    CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
185    List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
186    Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
187      HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
188    PriorityFunction qosFunction = mock(PriorityFunction.class);
189    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
190    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
191    Answer<Void> answerToRun = new Answer<Void>() {
192      @Override
193      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
194        synchronized (handlerThreads) {
195          handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread());
196        }
197        countDownLatch.countDown();
198        return null;
199      }
200    };
201    for (CallRunner task : tasks) {
202      task.setStatus(new MonitoredRPCHandlerImpl("test"));
203      doAnswer(answerToRun).when(task).run();
204    }
205
206    RpcScheduler scheduler =
207      new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS);
208    scheduler.init(CONTEXT);
209    scheduler.start();
210    for (CallRunner task : tasks) {
211      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
212      scheduler.dispatch(task);
213    }
214    for (CallRunner task : tasks) {
215      verify(task, timeout(10000)).run();
216    }
217    scheduler.stop();
218
219    // Tests that these requests are handled by three distinct threads.
220    countDownLatch.await();
221    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
222  }
223
224  private CallRunner createMockTask(int priority) {
225    ServerCall call = mock(ServerCall.class);
226    CallRunner task = mock(CallRunner.class);
227    RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
228    when(task.getRpcCall()).thenReturn(call);
229    when(call.getHeader()).thenReturn(header);
230    return task;
231  }
232
233  @Test
234  public void testRpcScheduler() throws Exception {
235    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
236    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
237  }
238
239  @Test
240  public void testPluggableRpcQueue() throws Exception {
241    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
242      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
243
244    try {
245      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass");
246      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
247    } catch (PluggableRpcQueueNotFound e) {
248      // expected
249    } catch (Exception e) {
250      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
251    }
252
253    try {
254      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
255        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
256      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
257    } catch (PluggableRpcQueueNotFound e) {
258      // expected
259    } catch (Exception e) {
260      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
261    }
262  }
263
264  @Test
265  public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
266    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
267    Configuration schedConf = HBaseConfiguration.create();
268    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
269    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
270      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
271    schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
272
273    PriorityFunction priority = mock(PriorityFunction.class);
274    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
275    SimpleRpcScheduler scheduler =
276      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
277
278    Field f = scheduler.getClass().getDeclaredField("callExecutor");
279    f.setAccessible(true);
280    assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
281  }
282
283  @Test
284  public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
285    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
286    Configuration schedConf = HBaseConfiguration.create();
287    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
288    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
289      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
290
291    PriorityFunction priority = mock(PriorityFunction.class);
292    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
293    SimpleRpcScheduler scheduler =
294      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
295
296    Field f = scheduler.getClass().getDeclaredField("callExecutor");
297    f.setAccessible(true);
298    assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
299  }
300
301  @Test
302  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
303
304    Configuration schedConf = HBaseConfiguration.create();
305
306    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
307    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
308    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
309      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
310    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
311      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
312
313    PriorityFunction priority = mock(PriorityFunction.class);
314    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
315    SimpleRpcScheduler scheduler =
316      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
317    try {
318      scheduler.start();
319
320      CallRunner putCallTask = mock(CallRunner.class);
321      ServerCall putCall = mock(ServerCall.class);
322      putCall.param =
323        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
324      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
325      when(putCallTask.getRpcCall()).thenReturn(putCall);
326      when(putCall.getHeader()).thenReturn(putHead);
327
328      assertTrue(scheduler.dispatch(putCallTask));
329
330      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
331      scheduler.onConfigurationChange(schedConf);
332      assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
333      waitUntilQueueEmpty(scheduler);
334    } finally {
335      scheduler.stop();
336    }
337  }
338
339  private void testRpcScheduler(final String queueType) throws Exception {
340    testRpcScheduler(queueType, null);
341  }
342
343  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
344    throws Exception {
345    Configuration schedConf = HBaseConfiguration.create();
346    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
347
348    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
349      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
350    }
351
352    PriorityFunction priority = mock(PriorityFunction.class);
353    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
354
355    RpcScheduler scheduler =
356      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
357    try {
358      scheduler.start();
359
360      CallRunner smallCallTask = mock(CallRunner.class);
361      ServerCall smallCall = mock(ServerCall.class);
362      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
363      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
364      when(smallCall.getHeader()).thenReturn(smallHead);
365
366      CallRunner largeCallTask = mock(CallRunner.class);
367      ServerCall largeCall = mock(ServerCall.class);
368      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
369      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
370      when(largeCall.getHeader()).thenReturn(largeHead);
371
372      CallRunner hugeCallTask = mock(CallRunner.class);
373      ServerCall hugeCall = mock(ServerCall.class);
374      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
375      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
376      when(hugeCall.getHeader()).thenReturn(hugeHead);
377
378      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
379      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
380      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
381
382      final ArrayList<Integer> work = new ArrayList<>();
383      doAnswerTaskExecution(smallCallTask, work, 10, 250);
384      doAnswerTaskExecution(largeCallTask, work, 50, 250);
385      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
386
387      scheduler.dispatch(smallCallTask);
388      scheduler.dispatch(smallCallTask);
389      scheduler.dispatch(smallCallTask);
390      scheduler.dispatch(hugeCallTask);
391      scheduler.dispatch(smallCallTask);
392      scheduler.dispatch(largeCallTask);
393      scheduler.dispatch(smallCallTask);
394      scheduler.dispatch(smallCallTask);
395
396      while (work.size() < 8) {
397        Thread.sleep(100);
398      }
399
400      int seqSum = 0;
401      int totalTime = 0;
402      for (int i = 0; i < work.size(); ++i) {
403        LOG.debug("Request i=" + i + " value=" + work.get(i));
404        seqSum += work.get(i);
405        totalTime += seqSum;
406      }
407      LOG.debug("Total Time: " + totalTime);
408
409      // -> [small small small huge small large small small]
410      // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
411      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
412      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
413        assertEquals(530, totalTime);
414      } else if (
415        queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)
416          || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)
417      ) {
418        assertEquals(930, totalTime);
419      }
420    } finally {
421      scheduler.stop();
422    }
423  }
424
425  @Test
426  public void testScanQueueWithZeroScanRatio() throws Exception {
427
428    Configuration schedConf = HBaseConfiguration.create();
429    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
430    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
431    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
432
433    PriorityFunction priority = mock(PriorityFunction.class);
434    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
435
436    RpcScheduler scheduler =
437      new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD);
438    assertNotEquals(null, scheduler);
439  }
440
441  @Test
442  public void testScanQueues() throws Exception {
443    Configuration schedConf = HBaseConfiguration.create();
444    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
445    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
446    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
447
448    PriorityFunction priority = mock(PriorityFunction.class);
449    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
450
451    RpcScheduler scheduler =
452      new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD);
453    try {
454      scheduler.start();
455
456      CallRunner putCallTask = mock(CallRunner.class);
457      ServerCall putCall = mock(ServerCall.class);
458      putCall.param =
459        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
460      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
461      when(putCallTask.getRpcCall()).thenReturn(putCall);
462      when(putCall.getHeader()).thenReturn(putHead);
463      when(putCall.getParam()).thenReturn(putCall.param);
464
465      CallRunner getCallTask = mock(CallRunner.class);
466      ServerCall getCall = mock(ServerCall.class);
467      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
468      when(getCallTask.getRpcCall()).thenReturn(getCall);
469      when(getCall.getHeader()).thenReturn(getHead);
470
471      CallRunner scanCallTask = mock(CallRunner.class);
472      ServerCall scanCall = mock(ServerCall.class);
473      scanCall.param = ScanRequest.newBuilder().build();
474      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
475      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
476      when(scanCall.getHeader()).thenReturn(scanHead);
477      when(scanCall.getParam()).thenReturn(scanCall.param);
478
479      CallRunner bulkLoadCallTask = mock(CallRunner.class);
480      ServerCall bulkLoadCall = mock(ServerCall.class);
481      bulkLoadCall.param = ScanRequest.newBuilder().build();
482      RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build();
483      when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
484      when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
485      when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
486
487      ArrayList<Integer> work = new ArrayList<>();
488      doAnswerTaskExecution(putCallTask, work, 1, 1000);
489      doAnswerTaskExecution(getCallTask, work, 2, 1000);
490      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
491      doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
492
493      // There are 3 queues: [puts], [gets], [scans], [bulkload]
494      // so the calls will be interleaved
495      scheduler.dispatch(putCallTask);
496      scheduler.dispatch(putCallTask);
497      scheduler.dispatch(putCallTask);
498      scheduler.dispatch(getCallTask);
499      scheduler.dispatch(getCallTask);
500      scheduler.dispatch(getCallTask);
501      scheduler.dispatch(scanCallTask);
502      scheduler.dispatch(scanCallTask);
503      scheduler.dispatch(scanCallTask);
504      scheduler.dispatch(bulkLoadCallTask);
505      scheduler.dispatch(bulkLoadCallTask);
506      scheduler.dispatch(bulkLoadCallTask);
507      while (work.size() < 6) {
508        Thread.sleep(100);
509      }
510
511      for (int i = 0; i < work.size() - 2; i += 3) {
512        assertNotEquals(work.get(i + 0), work.get(i + 1));
513        assertNotEquals(work.get(i + 0), work.get(i + 2));
514        assertNotEquals(work.get(i + 1), work.get(i + 2));
515      }
516    } finally {
517      scheduler.stop();
518    }
519  }
520
521  private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
522    final int value, final int sleepInterval) {
523    callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
524    doAnswer(new Answer<Object>() {
525      @Override
526      public Object answer(InvocationOnMock invocation) {
527        synchronized (results) {
528          results.add(value);
529        }
530        Threads.sleepWithoutInterrupt(sleepInterval);
531        return null;
532      }
533    }).when(callTask).run();
534  }
535
536  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
537    throws InterruptedException {
538    while (scheduler.getGeneralQueueLength() > 0) {
539      Thread.sleep(100);
540    }
541  }
542
543  @Test
544  public void testSoftAndHardQueueLimits() throws Exception {
545
546    Configuration schedConf = HBaseConfiguration.create();
547
548    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
549    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
550    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
551      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
552
553    PriorityFunction priority = mock(PriorityFunction.class);
554    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
555    SimpleRpcScheduler scheduler =
556      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
557    try {
558      scheduler.start();
559
560      CallRunner putCallTask = mock(CallRunner.class);
561      ServerCall putCall = mock(ServerCall.class);
562      putCall.param =
563        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
564      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
565      when(putCallTask.getRpcCall()).thenReturn(putCall);
566      when(putCall.getHeader()).thenReturn(putHead);
567
568      assertTrue(scheduler.dispatch(putCallTask));
569
570      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
571      scheduler.onConfigurationChange(schedConf);
572      assertFalse(scheduler.dispatch(putCallTask));
573      waitUntilQueueEmpty(scheduler);
574      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
575      scheduler.onConfigurationChange(schedConf);
576      assertTrue(scheduler.dispatch(putCallTask));
577    } finally {
578      scheduler.stop();
579    }
580  }
581
582  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
583
584    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
585
586    private long offset;
587
588    private final Set<String> threadNamePrefixs = new HashSet<>();
589
590    @Override
591    public long currentTime() {
592      for (String threadNamePrefix : threadNamePrefixs) {
593        String threadName = Thread.currentThread().getName();
594        if (threadName.startsWith(threadNamePrefix)) {
595          if (timeQ != null) {
596            Long qTime = timeQ.poll();
597            if (qTime != null) {
598              return qTime.longValue() + offset;
599            }
600          }
601        }
602      }
603      return System.currentTimeMillis();
604    }
605  }
606
607  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
608  // from the get go. So we are always overloaded. The test below would seem to complete the
609  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
610  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
611  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
612  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
613  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
614  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
615  // timer-based. Disabling for now till chat with authors.
616  @Test
617  public void testCoDelScheduling() throws Exception {
618    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
619    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
620    Configuration schedConf = HBaseConfiguration.create();
621    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
622    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
623      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
624    PriorityFunction priority = mock(PriorityFunction.class);
625    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
626    SimpleRpcScheduler scheduler =
627      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
628    try {
629      // Loading mocked call runner can take a good amount of time the first time through
630      // (haven't looked why). Load it for first time here outside of the timed loop.
631      getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2);
632      scheduler.start();
633      EnvironmentEdgeManager.injectEdge(envEdge);
634      envEdge.offset = 5;
635      // Calls faster than min delay
636      // LOG.info("Start");
637      for (int i = 0; i < 100; i++) {
638        long time = EnvironmentEdgeManager.currentTime();
639        envEdge.timeQ.put(time);
640        CallRunner cr = getMockedCallRunner(time, 2);
641        scheduler.dispatch(cr);
642      }
643      // LOG.info("Loop done");
644      // make sure fast calls are handled
645      waitUntilQueueEmpty(scheduler);
646      Thread.sleep(100);
647      assertEquals(0, scheduler.getNumGeneralCallsDropped(),
648        "None of these calls should have been discarded");
649
650      envEdge.offset = 151;
651      // calls slower than min delay, but not individually slow enough to be dropped
652      for (int i = 0; i < 20; i++) {
653        long time = EnvironmentEdgeManager.currentTime();
654        envEdge.timeQ.put(time);
655        CallRunner cr = getMockedCallRunner(time, 2);
656        scheduler.dispatch(cr);
657      }
658
659      // make sure somewhat slow calls are handled
660      waitUntilQueueEmpty(scheduler);
661      Thread.sleep(100);
662      assertEquals(0, scheduler.getNumGeneralCallsDropped(),
663        "None of these calls should have been discarded");
664
665      envEdge.offset = 2000;
666      // now slow calls and the ones to be dropped
667      for (int i = 0; i < 60; i++) {
668        long time = EnvironmentEdgeManager.currentTime();
669        envEdge.timeQ.put(time);
670        CallRunner cr = getMockedCallRunner(time, 100);
671        scheduler.dispatch(cr);
672      }
673
674      // make sure somewhat slow calls are handled
675      waitUntilQueueEmpty(scheduler);
676      Thread.sleep(100);
677      assertTrue(scheduler.getNumGeneralCallsDropped() > 12,
678        "There should have been at least 12 calls dropped however there were "
679          + scheduler.getNumGeneralCallsDropped());
680    } finally {
681      scheduler.stop();
682    }
683  }
684
685  @Test
686  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
687    String name = testMethodName;
688    int handlerCount = 1;
689    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
690    int maxQueueLength = 0;
691    PriorityFunction priority = mock(PriorityFunction.class);
692    Configuration conf = HBaseConfiguration.create();
693    Abortable abortable = mock(Abortable.class);
694    FastPathBalancedQueueRpcExecutor executor =
695      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType,
696        maxQueueLength, priority, conf, abortable));
697    CallRunner task = mock(CallRunner.class);
698    assertFalse(executor.dispatch(task));
699    // make sure we never internally get a handler, which would skip the queue validation
700    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
701      Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
702  }
703
704  @Test
705  public void testMetaRWScanQueues() throws Exception {
706    Configuration schedConf = HBaseConfiguration.create();
707    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
708    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
709    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
710
711    PriorityFunction priority = mock(PriorityFunction.class);
712    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
713
714    RpcScheduler scheduler =
715      new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD);
716    try {
717      scheduler.start();
718
719      CallRunner putCallTask = mock(CallRunner.class);
720      ServerCall putCall = mock(ServerCall.class);
721      putCall.param =
722        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
723      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
724      when(putCallTask.getRpcCall()).thenReturn(putCall);
725      when(putCall.getHeader()).thenReturn(putHead);
726      when(putCall.getParam()).thenReturn(putCall.param);
727
728      CallRunner clientReadCallTask = mock(CallRunner.class);
729      ServerCall clientReadCall = mock(ServerCall.class);
730      RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
731      when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
732      when(clientReadCall.getHeader()).thenReturn(clientReadHead);
733
734      CallRunner internalReadCallTask = mock(CallRunner.class);
735      ServerCall internalReadCall = mock(ServerCall.class);
736      internalReadCall.param = ScanRequest.newBuilder().build();
737      RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan")
738        .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build();
739      when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
740      when(internalReadCall.getHeader()).thenReturn(masterReadHead);
741      when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
742
743      ArrayList<Integer> work = new ArrayList<>();
744      doAnswerTaskExecution(putCallTask, work, 1, 1000);
745      doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
746      doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
747
748      // There are 3 queues: [puts], [gets], [scans]
749      // so the calls will be interleaved
750      scheduler.dispatch(putCallTask);
751      scheduler.dispatch(putCallTask);
752      scheduler.dispatch(putCallTask);
753      scheduler.dispatch(clientReadCallTask);
754      scheduler.dispatch(clientReadCallTask);
755      scheduler.dispatch(clientReadCallTask);
756      scheduler.dispatch(internalReadCallTask);
757      scheduler.dispatch(internalReadCallTask);
758      scheduler.dispatch(internalReadCallTask);
759
760      while (work.size() < 6) {
761        Thread.sleep(100);
762      }
763
764      for (int i = 0; i < work.size() - 2; i += 3) {
765        assertNotEquals(work.get(i + 0), work.get(i + 1));
766        assertNotEquals(work.get(i + 0), work.get(i + 2));
767        assertNotEquals(work.get(i + 1), work.get(i + 2));
768      }
769    } finally {
770      scheduler.stop();
771    }
772  }
773
774  // Get mocked call that has the CallRunner sleep for a while so that the fast
775  // path isn't hit.
776  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
777    ServerCall putCall = new ServerCall(1, null, null,
778      RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
779      RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
780      null, null, 9, null, timestamp, 0, null, null, null) {
781
782      @Override
783      public void sendResponseIfReady() throws IOException {
784      }
785    };
786
787    return new CallRunner(null, putCall) {
788      @Override
789      public void run() {
790        if (sleepTime <= 0) {
791          return;
792        }
793        try {
794          LOG.warn("Sleeping for " + sleepTime);
795          Thread.sleep(sleepTime);
796          LOG.warn("Done Sleeping for " + sleepTime);
797        } catch (InterruptedException e) {
798        }
799      }
800
801      @Override
802      public RpcCall getRpcCall() {
803        return putCall;
804      }
805
806      @Override
807      public void drop() {
808      }
809    };
810  }
811}