001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.timeout;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.net.InetSocketAddress;
036import java.util.ArrayList;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.LinkedBlockingQueue;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.Abortable;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
051import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.RPCTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdge;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.Threads;
058import org.junit.Before;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.mockito.Mockito;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
072import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
073import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
074
075import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
079
080@Category({ RPCTests.class, MediumTests.class })
081public class TestSimpleRpcScheduler {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085    HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);
086
087  @Rule
088  public TestName testName = new TestName();
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);
091
092  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
093    @Override
094    public InetSocketAddress getListenerAddress() {
095      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
096    }
097  };
098  private Configuration conf;
099
100  @Before
101  public void setUp() {
102    conf = HBaseConfiguration.create();
103  }
104
105  @Test
106  public void testBasic() throws IOException, InterruptedException {
107
108    PriorityFunction qosFunction = mock(PriorityFunction.class);
109    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
110    scheduler.init(CONTEXT);
111    scheduler.start();
112    CallRunner task = createMockTask(HConstants.NORMAL_QOS);
113    task.setStatus(new MonitoredRPCHandlerImpl("test"));
114    scheduler.dispatch(task);
115    verify(task, timeout(10000)).run();
116    scheduler.stop();
117  }
118
119  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
120    try {
121      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
122      ExecutorField.setAccessible(true);
123
124      RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler);
125
126      Field handlerCountField =
127        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
128
129      handlerCountField.setAccessible(true);
130      handlerCountField.set(rpcExecutor, 0);
131
132      Field numCallQueuesField =
133        rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
134
135      numCallQueuesField.setAccessible(true);
136      numCallQueuesField.set(rpcExecutor, 1);
137
138      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass()
139        .getDeclaredField("currentQueueLimit");
140
141      currentQueueLimitField.setAccessible(true);
142      currentQueueLimitField.set(rpcExecutor, 100);
143
144    } catch (NoSuchFieldException e) {
145      LOG.error("No such field exception" + e);
146    } catch (IllegalAccessException e) {
147      LOG.error("Illegal access exception" + e);
148    }
149
150    return scheduler;
151  }
152
153  @Test
154  public void testCallQueueInfo() throws IOException, InterruptedException {
155
156    PriorityFunction qosFunction = mock(PriorityFunction.class);
157    RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0);
158
159    scheduler.init(CONTEXT);
160
161    // Set the handlers to zero. So that number of requests in call Queue can be tested
162    scheduler = disableHandlers(scheduler);
163    scheduler.start();
164
165    int totalCallMethods = 10;
166    for (int i = totalCallMethods; i > 0; i--) {
167      CallRunner task = createMockTask(HConstants.NORMAL_QOS);
168      task.setStatus(new MonitoredRPCHandlerImpl("test"));
169      scheduler.dispatch(task);
170    }
171
172    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
173
174    for (String callQueueName : callQueueInfo.getCallQueueNames()) {
175
176      for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) {
177        assertEquals(totalCallMethods,
178          callQueueInfo.getCallMethodCount(callQueueName, calledMethod));
179      }
180
181    }
182
183    scheduler.stop();
184
185  }
186
187  @Test
188  public void testHandlerIsolation() throws IOException, InterruptedException {
189    CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
190    CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
191    CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
192    List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
193    Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
194      HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
195    PriorityFunction qosFunction = mock(PriorityFunction.class);
196    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
197    final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
198    Answer<Void> answerToRun = new Answer<Void>() {
199      @Override
200      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
201        synchronized (handlerThreads) {
202          handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread());
203        }
204        countDownLatch.countDown();
205        return null;
206      }
207    };
208    for (CallRunner task : tasks) {
209      task.setStatus(new MonitoredRPCHandlerImpl("test"));
210      doAnswer(answerToRun).when(task).run();
211    }
212
213    RpcScheduler scheduler =
214      new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS);
215    scheduler.init(CONTEXT);
216    scheduler.start();
217    for (CallRunner task : tasks) {
218      when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task));
219      scheduler.dispatch(task);
220    }
221    for (CallRunner task : tasks) {
222      verify(task, timeout(10000)).run();
223    }
224    scheduler.stop();
225
226    // Tests that these requests are handled by three distinct threads.
227    countDownLatch.await();
228    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
229  }
230
231  private CallRunner createMockTask(int priority) {
232    ServerCall call = mock(ServerCall.class);
233    CallRunner task = mock(CallRunner.class);
234    RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
235    when(task.getRpcCall()).thenReturn(call);
236    when(call.getHeader()).thenReturn(header);
237    return task;
238  }
239
240  @Test
241  public void testRpcScheduler() throws Exception {
242    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
243    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
244  }
245
246  @Test
247  public void testPluggableRpcQueue() throws Exception {
248    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
249      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
250
251    try {
252      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass");
253      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
254    } catch (PluggableRpcQueueNotFound e) {
255      // expected
256    } catch (Exception e) {
257      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
258    }
259
260    try {
261      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
262        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
263      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
264    } catch (PluggableRpcQueueNotFound e) {
265      // expected
266    } catch (Exception e) {
267      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
268    }
269  }
270
271  @Test
272  public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
273    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
274    Configuration schedConf = HBaseConfiguration.create();
275    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
276    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
277      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
278    schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
279
280    PriorityFunction priority = mock(PriorityFunction.class);
281    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
282    SimpleRpcScheduler scheduler =
283      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
284
285    Field f = scheduler.getClass().getDeclaredField("callExecutor");
286    f.setAccessible(true);
287    assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
288  }
289
290  @Test
291  public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
292    String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
293    Configuration schedConf = HBaseConfiguration.create();
294    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
295    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
296      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
297
298    PriorityFunction priority = mock(PriorityFunction.class);
299    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
300    SimpleRpcScheduler scheduler =
301      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
302
303    Field f = scheduler.getClass().getDeclaredField("callExecutor");
304    f.setAccessible(true);
305    assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
306  }
307
308  @Test
309  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
310
311    Configuration schedConf = HBaseConfiguration.create();
312
313    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
314    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
315    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
316      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
317    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
318      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
319
320    PriorityFunction priority = mock(PriorityFunction.class);
321    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
322    SimpleRpcScheduler scheduler =
323      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
324    try {
325      scheduler.start();
326
327      CallRunner putCallTask = mock(CallRunner.class);
328      ServerCall putCall = mock(ServerCall.class);
329      putCall.param =
330        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
331      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
332      when(putCallTask.getRpcCall()).thenReturn(putCall);
333      when(putCall.getHeader()).thenReturn(putHead);
334
335      assertTrue(scheduler.dispatch(putCallTask));
336
337      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
338      scheduler.onConfigurationChange(schedConf);
339      assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
340      waitUntilQueueEmpty(scheduler);
341    } finally {
342      scheduler.stop();
343    }
344  }
345
346  private void testRpcScheduler(final String queueType) throws Exception {
347    testRpcScheduler(queueType, null);
348  }
349
350  private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
351    throws Exception {
352    Configuration schedConf = HBaseConfiguration.create();
353    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
354
355    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
356      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
357    }
358
359    PriorityFunction priority = mock(PriorityFunction.class);
360    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
361
362    RpcScheduler scheduler =
363      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
364    try {
365      scheduler.start();
366
367      CallRunner smallCallTask = mock(CallRunner.class);
368      ServerCall smallCall = mock(ServerCall.class);
369      RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
370      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
371      when(smallCall.getHeader()).thenReturn(smallHead);
372
373      CallRunner largeCallTask = mock(CallRunner.class);
374      ServerCall largeCall = mock(ServerCall.class);
375      RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
376      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
377      when(largeCall.getHeader()).thenReturn(largeHead);
378
379      CallRunner hugeCallTask = mock(CallRunner.class);
380      ServerCall hugeCall = mock(ServerCall.class);
381      RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
382      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
383      when(hugeCall.getHeader()).thenReturn(hugeHead);
384
385      when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
386      when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
387      when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
388
389      final ArrayList<Integer> work = new ArrayList<>();
390      doAnswerTaskExecution(smallCallTask, work, 10, 250);
391      doAnswerTaskExecution(largeCallTask, work, 50, 250);
392      doAnswerTaskExecution(hugeCallTask, work, 100, 250);
393
394      scheduler.dispatch(smallCallTask);
395      scheduler.dispatch(smallCallTask);
396      scheduler.dispatch(smallCallTask);
397      scheduler.dispatch(hugeCallTask);
398      scheduler.dispatch(smallCallTask);
399      scheduler.dispatch(largeCallTask);
400      scheduler.dispatch(smallCallTask);
401      scheduler.dispatch(smallCallTask);
402
403      while (work.size() < 8) {
404        Thread.sleep(100);
405      }
406
407      int seqSum = 0;
408      int totalTime = 0;
409      for (int i = 0; i < work.size(); ++i) {
410        LOG.debug("Request i=" + i + " value=" + work.get(i));
411        seqSum += work.get(i);
412        totalTime += seqSum;
413      }
414      LOG.debug("Total Time: " + totalTime);
415
416      // -> [small small small huge small large small small]
417      // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
418      // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
419      if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
420        assertEquals(530, totalTime);
421      } else if (
422        queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)
423          || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)
424      ) {
425        assertEquals(930, totalTime);
426      }
427    } finally {
428      scheduler.stop();
429    }
430  }
431
432  @Test
433  public void testScanQueueWithZeroScanRatio() throws Exception {
434
435    Configuration schedConf = HBaseConfiguration.create();
436    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
437    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
438    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
439
440    PriorityFunction priority = mock(PriorityFunction.class);
441    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
442
443    RpcScheduler scheduler =
444      new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD);
445    assertNotEquals(null, scheduler);
446  }
447
448  @Test
449  public void testScanQueues() throws Exception {
450    Configuration schedConf = HBaseConfiguration.create();
451    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
452    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
453    schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
454
455    PriorityFunction priority = mock(PriorityFunction.class);
456    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
457
458    RpcScheduler scheduler =
459      new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD);
460    try {
461      scheduler.start();
462
463      CallRunner putCallTask = mock(CallRunner.class);
464      ServerCall putCall = mock(ServerCall.class);
465      putCall.param =
466        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
467      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
468      when(putCallTask.getRpcCall()).thenReturn(putCall);
469      when(putCall.getHeader()).thenReturn(putHead);
470      when(putCall.getParam()).thenReturn(putCall.param);
471
472      CallRunner getCallTask = mock(CallRunner.class);
473      ServerCall getCall = mock(ServerCall.class);
474      RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
475      when(getCallTask.getRpcCall()).thenReturn(getCall);
476      when(getCall.getHeader()).thenReturn(getHead);
477
478      CallRunner scanCallTask = mock(CallRunner.class);
479      ServerCall scanCall = mock(ServerCall.class);
480      scanCall.param = ScanRequest.newBuilder().build();
481      RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
482      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
483      when(scanCall.getHeader()).thenReturn(scanHead);
484      when(scanCall.getParam()).thenReturn(scanCall.param);
485
486      CallRunner bulkLoadCallTask = mock(CallRunner.class);
487      ServerCall bulkLoadCall = mock(ServerCall.class);
488      bulkLoadCall.param = ScanRequest.newBuilder().build();
489      RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build();
490      when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
491      when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
492      when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
493
494      ArrayList<Integer> work = new ArrayList<>();
495      doAnswerTaskExecution(putCallTask, work, 1, 1000);
496      doAnswerTaskExecution(getCallTask, work, 2, 1000);
497      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
498      doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
499
500      // There are 3 queues: [puts], [gets], [scans], [bulkload]
501      // so the calls will be interleaved
502      scheduler.dispatch(putCallTask);
503      scheduler.dispatch(putCallTask);
504      scheduler.dispatch(putCallTask);
505      scheduler.dispatch(getCallTask);
506      scheduler.dispatch(getCallTask);
507      scheduler.dispatch(getCallTask);
508      scheduler.dispatch(scanCallTask);
509      scheduler.dispatch(scanCallTask);
510      scheduler.dispatch(scanCallTask);
511      scheduler.dispatch(bulkLoadCallTask);
512      scheduler.dispatch(bulkLoadCallTask);
513      scheduler.dispatch(bulkLoadCallTask);
514      while (work.size() < 6) {
515        Thread.sleep(100);
516      }
517
518      for (int i = 0; i < work.size() - 2; i += 3) {
519        assertNotEquals(work.get(i + 0), work.get(i + 1));
520        assertNotEquals(work.get(i + 0), work.get(i + 2));
521        assertNotEquals(work.get(i + 1), work.get(i + 2));
522      }
523    } finally {
524      scheduler.stop();
525    }
526  }
527
528  private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
529    final int value, final int sleepInterval) {
530    callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
531    doAnswer(new Answer<Object>() {
532      @Override
533      public Object answer(InvocationOnMock invocation) {
534        synchronized (results) {
535          results.add(value);
536        }
537        Threads.sleepWithoutInterrupt(sleepInterval);
538        return null;
539      }
540    }).when(callTask).run();
541  }
542
543  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
544    throws InterruptedException {
545    while (scheduler.getGeneralQueueLength() > 0) {
546      Thread.sleep(100);
547    }
548  }
549
550  @Test
551  public void testSoftAndHardQueueLimits() throws Exception {
552
553    Configuration schedConf = HBaseConfiguration.create();
554
555    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
556    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
557    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
558      RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
559
560    PriorityFunction priority = mock(PriorityFunction.class);
561    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
562    SimpleRpcScheduler scheduler =
563      new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
564    try {
565      scheduler.start();
566
567      CallRunner putCallTask = mock(CallRunner.class);
568      ServerCall putCall = mock(ServerCall.class);
569      putCall.param =
570        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
571      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
572      when(putCallTask.getRpcCall()).thenReturn(putCall);
573      when(putCall.getHeader()).thenReturn(putHead);
574
575      assertTrue(scheduler.dispatch(putCallTask));
576
577      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
578      scheduler.onConfigurationChange(schedConf);
579      assertFalse(scheduler.dispatch(putCallTask));
580      waitUntilQueueEmpty(scheduler);
581      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
582      scheduler.onConfigurationChange(schedConf);
583      assertTrue(scheduler.dispatch(putCallTask));
584    } finally {
585      scheduler.stop();
586    }
587  }
588
589  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
590
591    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
592
593    private long offset;
594
595    private final Set<String> threadNamePrefixs = new HashSet<>();
596
597    @Override
598    public long currentTime() {
599      for (String threadNamePrefix : threadNamePrefixs) {
600        String threadName = Thread.currentThread().getName();
601        if (threadName.startsWith(threadNamePrefix)) {
602          if (timeQ != null) {
603            Long qTime = timeQ.poll();
604            if (qTime != null) {
605              return qTime.longValue() + offset;
606            }
607          }
608        }
609      }
610      return System.currentTimeMillis();
611    }
612  }
613
614  // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay
615  // from the get go. So we are always overloaded. The test below would seem to complete the
616  // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping
617  // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for
618  // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e.
619  // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath
620  // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount,
621  // numCallQueues... Codel is hard to test. This test is going to be flakey given it all
622  // timer-based. Disabling for now till chat with authors.
623  @Test
624  public void testCoDelScheduling() throws Exception {
625    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
626    envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
627    Configuration schedConf = HBaseConfiguration.create();
628    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
629    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
630      RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
631    PriorityFunction priority = mock(PriorityFunction.class);
632    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
633    SimpleRpcScheduler scheduler =
634      new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD);
635    try {
636      // Loading mocked call runner can take a good amount of time the first time through
637      // (haven't looked why). Load it for first time here outside of the timed loop.
638      getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2);
639      scheduler.start();
640      EnvironmentEdgeManager.injectEdge(envEdge);
641      envEdge.offset = 5;
642      // Calls faster than min delay
643      // LOG.info("Start");
644      for (int i = 0; i < 100; i++) {
645        long time = EnvironmentEdgeManager.currentTime();
646        envEdge.timeQ.put(time);
647        CallRunner cr = getMockedCallRunner(time, 2);
648        scheduler.dispatch(cr);
649      }
650      // LOG.info("Loop done");
651      // make sure fast calls are handled
652      waitUntilQueueEmpty(scheduler);
653      Thread.sleep(100);
654      assertEquals("None of these calls should have been discarded", 0,
655        scheduler.getNumGeneralCallsDropped());
656
657      envEdge.offset = 151;
658      // calls slower than min delay, but not individually slow enough to be dropped
659      for (int i = 0; i < 20; i++) {
660        long time = EnvironmentEdgeManager.currentTime();
661        envEdge.timeQ.put(time);
662        CallRunner cr = getMockedCallRunner(time, 2);
663        scheduler.dispatch(cr);
664      }
665
666      // make sure somewhat slow calls are handled
667      waitUntilQueueEmpty(scheduler);
668      Thread.sleep(100);
669      assertEquals("None of these calls should have been discarded", 0,
670        scheduler.getNumGeneralCallsDropped());
671
672      envEdge.offset = 2000;
673      // now slow calls and the ones to be dropped
674      for (int i = 0; i < 60; i++) {
675        long time = EnvironmentEdgeManager.currentTime();
676        envEdge.timeQ.put(time);
677        CallRunner cr = getMockedCallRunner(time, 100);
678        scheduler.dispatch(cr);
679      }
680
681      // make sure somewhat slow calls are handled
682      waitUntilQueueEmpty(scheduler);
683      Thread.sleep(100);
684      assertTrue("There should have been at least 12 calls dropped however there were "
685        + scheduler.getNumGeneralCallsDropped(), scheduler.getNumGeneralCallsDropped() > 12);
686    } finally {
687      scheduler.stop();
688    }
689  }
690
691  @Test
692  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
693    String name = testName.getMethodName();
694    int handlerCount = 1;
695    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
696    int maxQueueLength = 0;
697    PriorityFunction priority = mock(PriorityFunction.class);
698    Configuration conf = HBaseConfiguration.create();
699    Abortable abortable = mock(Abortable.class);
700    FastPathBalancedQueueRpcExecutor executor =
701      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType,
702        maxQueueLength, priority, conf, abortable));
703    CallRunner task = mock(CallRunner.class);
704    assertFalse(executor.dispatch(task));
705    // make sure we never internally get a handler, which would skip the queue validation
706    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
707      Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
708  }
709
710  @Test
711  public void testMetaRWScanQueues() throws Exception {
712    Configuration schedConf = HBaseConfiguration.create();
713    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
714    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
715    schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
716
717    PriorityFunction priority = mock(PriorityFunction.class);
718    when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
719
720    RpcScheduler scheduler =
721      new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD);
722    try {
723      scheduler.start();
724
725      CallRunner putCallTask = mock(CallRunner.class);
726      ServerCall putCall = mock(ServerCall.class);
727      putCall.param =
728        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
729      RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
730      when(putCallTask.getRpcCall()).thenReturn(putCall);
731      when(putCall.getHeader()).thenReturn(putHead);
732      when(putCall.getParam()).thenReturn(putCall.param);
733
734      CallRunner clientReadCallTask = mock(CallRunner.class);
735      ServerCall clientReadCall = mock(ServerCall.class);
736      RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
737      when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
738      when(clientReadCall.getHeader()).thenReturn(clientReadHead);
739
740      CallRunner internalReadCallTask = mock(CallRunner.class);
741      ServerCall internalReadCall = mock(ServerCall.class);
742      internalReadCall.param = ScanRequest.newBuilder().build();
743      RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan")
744        .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build();
745      when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
746      when(internalReadCall.getHeader()).thenReturn(masterReadHead);
747      when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
748
749      ArrayList<Integer> work = new ArrayList<>();
750      doAnswerTaskExecution(putCallTask, work, 1, 1000);
751      doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
752      doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
753
754      // There are 3 queues: [puts], [gets], [scans]
755      // so the calls will be interleaved
756      scheduler.dispatch(putCallTask);
757      scheduler.dispatch(putCallTask);
758      scheduler.dispatch(putCallTask);
759      scheduler.dispatch(clientReadCallTask);
760      scheduler.dispatch(clientReadCallTask);
761      scheduler.dispatch(clientReadCallTask);
762      scheduler.dispatch(internalReadCallTask);
763      scheduler.dispatch(internalReadCallTask);
764      scheduler.dispatch(internalReadCallTask);
765
766      while (work.size() < 6) {
767        Thread.sleep(100);
768      }
769
770      for (int i = 0; i < work.size() - 2; i += 3) {
771        assertNotEquals(work.get(i + 0), work.get(i + 1));
772        assertNotEquals(work.get(i + 0), work.get(i + 2));
773        assertNotEquals(work.get(i + 1), work.get(i + 2));
774      }
775    } finally {
776      scheduler.stop();
777    }
778  }
779
780  // Get mocked call that has the CallRunner sleep for a while so that the fast
781  // path isn't hit.
782  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
783    ServerCall putCall = new ServerCall(1, null, null,
784      RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
785      RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
786      null, null, 9, null, timestamp, 0, null, null, null) {
787
788      @Override
789      public void sendResponseIfReady() throws IOException {
790      }
791    };
792
793    return new CallRunner(null, putCall) {
794      @Override
795      public void run() {
796        if (sleepTime <= 0) {
797          return;
798        }
799        try {
800          LOG.warn("Sleeping for " + sleepTime);
801          Thread.sleep(sleepTime);
802          LOG.warn("Done Sleeping for " + sleepTime);
803        } catch (InterruptedException e) {
804        }
805      }
806
807      @Override
808      public RpcCall getRpcCall() {
809        return putCall;
810      }
811
812      @Override
813      public void drop() {
814      }
815    };
816  }
817}