001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.timeout; 030import static org.mockito.Mockito.verify; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.lang.reflect.Field; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.BlockingQueue; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.LinkedBlockingQueue; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.Abortable; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RPCTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdge; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.Threads; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.mockito.Mockito; 064import org.mockito.invocation.InvocationOnMock; 065import org.mockito.stubbing.Answer; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 072import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 073 074import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 078 079@Category({ RPCTests.class, MediumTests.class }) 080public class TestSimpleRpcScheduler { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class); 085 086 @Rule 087 public TestName testName = new TestName(); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 090 091 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 092 @Override 093 public InetSocketAddress getListenerAddress() { 094 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 095 } 096 }; 097 private Configuration conf; 098 099 @Before 100 public void setUp() { 101 conf = HBaseConfiguration.create(); 102 } 103 104 @Test 105 public void testBasic() throws IOException, InterruptedException { 106 107 PriorityFunction qosFunction = mock(PriorityFunction.class); 108 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0); 109 scheduler.init(CONTEXT); 110 scheduler.start(); 111 CallRunner task = createMockTask(); 112 task.setStatus(new MonitoredRPCHandlerImpl()); 113 scheduler.dispatch(task); 114 verify(task, timeout(10000)).run(); 115 scheduler.stop(); 116 } 117 118 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 119 try { 120 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 121 ExecutorField.setAccessible(true); 122 123 RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler); 124 125 Field handlerCountField = 126 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount"); 127 128 handlerCountField.setAccessible(true); 129 handlerCountField.set(rpcExecutor, 0); 130 131 Field numCallQueuesField = 132 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues"); 133 134 numCallQueuesField.setAccessible(true); 135 numCallQueuesField.set(rpcExecutor, 1); 136 137 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass() 138 .getDeclaredField("currentQueueLimit"); 139 140 currentQueueLimitField.setAccessible(true); 141 currentQueueLimitField.set(rpcExecutor, 100); 142 143 } catch (NoSuchFieldException e) { 144 LOG.error("No such field exception" + e); 145 } catch (IllegalAccessException e) { 146 LOG.error("Illegal access exception" + e); 147 } 148 149 return scheduler; 150 } 151 152 @Test 153 public void testCallQueueInfo() throws IOException, InterruptedException { 154 155 PriorityFunction qosFunction = mock(PriorityFunction.class); 156 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0); 157 158 scheduler.init(CONTEXT); 159 160 // Set the handlers to zero. So that number of requests in call Queue can be tested 161 scheduler = disableHandlers(scheduler); 162 scheduler.start(); 163 164 int totalCallMethods = 10; 165 for (int i = totalCallMethods; i > 0; i--) { 166 CallRunner task = createMockTask(); 167 task.setStatus(new MonitoredRPCHandlerImpl()); 168 scheduler.dispatch(task); 169 } 170 171 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 172 173 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 174 175 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 176 assertEquals(totalCallMethods, 177 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 178 } 179 180 } 181 182 scheduler.stop(); 183 184 } 185 186 @Test 187 public void testHandlerIsolation() throws IOException, InterruptedException { 188 CallRunner generalTask = createMockTask(); 189 CallRunner priorityTask = createMockTask(); 190 CallRunner replicationTask = createMockTask(); 191 List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask); 192 Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask, 193 HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS); 194 PriorityFunction qosFunction = mock(PriorityFunction.class); 195 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 196 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 197 Answer<Void> answerToRun = new Answer<Void>() { 198 @Override 199 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 200 synchronized (handlerThreads) { 201 handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread()); 202 } 203 countDownLatch.countDown(); 204 return null; 205 } 206 }; 207 for (CallRunner task : tasks) { 208 task.setStatus(new MonitoredRPCHandlerImpl()); 209 doAnswer(answerToRun).when(task).run(); 210 } 211 212 RpcScheduler scheduler = 213 new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS); 214 scheduler.init(CONTEXT); 215 scheduler.start(); 216 for (CallRunner task : tasks) { 217 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 218 scheduler.dispatch(task); 219 } 220 for (CallRunner task : tasks) { 221 verify(task, timeout(10000)).run(); 222 } 223 scheduler.stop(); 224 225 // Tests that these requests are handled by three distinct threads. 226 countDownLatch.await(); 227 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 228 } 229 230 private CallRunner createMockTask() { 231 ServerCall call = mock(ServerCall.class); 232 CallRunner task = mock(CallRunner.class); 233 when(task.getRpcCall()).thenReturn(call); 234 return task; 235 } 236 237 @Test 238 public void testRpcScheduler() throws Exception { 239 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 240 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 241 } 242 243 @Test 244 public void testPluggableRpcQueue() throws Exception { 245 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 246 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 247 248 try { 249 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass"); 250 fail("Expected a PluggableRpcQueueNotFound for unloaded class"); 251 } catch (PluggableRpcQueueNotFound e) { 252 // expected 253 } catch (Exception e) { 254 fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e); 255 } 256 257 try { 258 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 259 "org.apache.hadoop.hbase.ipc.SimpleRpcServer"); 260 fail("Expected a PluggableRpcQueueNotFound for incompatible class"); 261 } catch (PluggableRpcQueueNotFound e) { 262 // expected 263 } catch (Exception e) { 264 fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e); 265 } 266 } 267 268 @Test 269 public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception { 270 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 271 Configuration schedConf = HBaseConfiguration.create(); 272 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 273 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 274 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 275 schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true); 276 277 PriorityFunction priority = mock(PriorityFunction.class); 278 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 279 SimpleRpcScheduler scheduler = 280 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 281 282 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 283 f.setAccessible(true); 284 assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor); 285 } 286 287 @Test 288 public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception { 289 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 290 Configuration schedConf = HBaseConfiguration.create(); 291 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 292 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 293 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 294 295 PriorityFunction priority = mock(PriorityFunction.class); 296 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 297 SimpleRpcScheduler scheduler = 298 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 299 300 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 301 f.setAccessible(true); 302 assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor); 303 } 304 305 @Test 306 public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { 307 308 Configuration schedConf = HBaseConfiguration.create(); 309 310 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); 311 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 312 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 313 RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 314 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 315 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 316 317 PriorityFunction priority = mock(PriorityFunction.class); 318 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 319 SimpleRpcScheduler scheduler = 320 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 321 try { 322 scheduler.start(); 323 324 CallRunner putCallTask = mock(CallRunner.class); 325 ServerCall putCall = mock(ServerCall.class); 326 putCall.param = 327 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 328 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 329 when(putCallTask.getRpcCall()).thenReturn(putCall); 330 when(putCall.getHeader()).thenReturn(putHead); 331 332 assertTrue(scheduler.dispatch(putCallTask)); 333 334 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4); 335 scheduler.onConfigurationChange(schedConf); 336 assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange()); 337 waitUntilQueueEmpty(scheduler); 338 } finally { 339 scheduler.stop(); 340 } 341 } 342 343 private void testRpcScheduler(final String queueType) throws Exception { 344 testRpcScheduler(queueType, null); 345 } 346 347 private void testRpcScheduler(final String queueType, final String pluggableQueueClass) 348 throws Exception { 349 Configuration schedConf = HBaseConfiguration.create(); 350 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 351 352 if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) { 353 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass); 354 } 355 356 PriorityFunction priority = mock(PriorityFunction.class); 357 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 358 359 RpcScheduler scheduler = 360 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 361 try { 362 scheduler.start(); 363 364 CallRunner smallCallTask = mock(CallRunner.class); 365 ServerCall smallCall = mock(ServerCall.class); 366 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 367 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 368 when(smallCall.getHeader()).thenReturn(smallHead); 369 370 CallRunner largeCallTask = mock(CallRunner.class); 371 ServerCall largeCall = mock(ServerCall.class); 372 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 373 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 374 when(largeCall.getHeader()).thenReturn(largeHead); 375 376 CallRunner hugeCallTask = mock(CallRunner.class); 377 ServerCall hugeCall = mock(ServerCall.class); 378 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 379 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 380 when(hugeCall.getHeader()).thenReturn(hugeHead); 381 382 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 383 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 384 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 385 386 final ArrayList<Integer> work = new ArrayList<>(); 387 doAnswerTaskExecution(smallCallTask, work, 10, 250); 388 doAnswerTaskExecution(largeCallTask, work, 50, 250); 389 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 390 391 scheduler.dispatch(smallCallTask); 392 scheduler.dispatch(smallCallTask); 393 scheduler.dispatch(smallCallTask); 394 scheduler.dispatch(hugeCallTask); 395 scheduler.dispatch(smallCallTask); 396 scheduler.dispatch(largeCallTask); 397 scheduler.dispatch(smallCallTask); 398 scheduler.dispatch(smallCallTask); 399 400 while (work.size() < 8) { 401 Thread.sleep(100); 402 } 403 404 int seqSum = 0; 405 int totalTime = 0; 406 for (int i = 0; i < work.size(); ++i) { 407 LOG.debug("Request i=" + i + " value=" + work.get(i)); 408 seqSum += work.get(i); 409 totalTime += seqSum; 410 } 411 LOG.debug("Total Time: " + totalTime); 412 413 // -> [small small small huge small large small small] 414 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 415 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 416 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 417 assertEquals(530, totalTime); 418 } else if ( 419 queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) 420 || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE) 421 ) { 422 assertEquals(930, totalTime); 423 } 424 } finally { 425 scheduler.stop(); 426 } 427 } 428 429 @Test 430 public void testScanQueueWithZeroScanRatio() throws Exception { 431 432 Configuration schedConf = HBaseConfiguration.create(); 433 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 434 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 435 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 436 437 PriorityFunction priority = mock(PriorityFunction.class); 438 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 439 440 RpcScheduler scheduler = 441 new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD); 442 assertNotEquals(null, scheduler); 443 } 444 445 @Test 446 public void testScanQueues() throws Exception { 447 Configuration schedConf = HBaseConfiguration.create(); 448 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 449 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 450 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 451 452 PriorityFunction priority = mock(PriorityFunction.class); 453 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 454 455 RpcScheduler scheduler = 456 new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); 457 try { 458 scheduler.start(); 459 460 CallRunner putCallTask = mock(CallRunner.class); 461 ServerCall putCall = mock(ServerCall.class); 462 putCall.param = 463 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 464 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 465 when(putCallTask.getRpcCall()).thenReturn(putCall); 466 when(putCall.getHeader()).thenReturn(putHead); 467 when(putCall.getParam()).thenReturn(putCall.param); 468 469 CallRunner getCallTask = mock(CallRunner.class); 470 ServerCall getCall = mock(ServerCall.class); 471 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 472 when(getCallTask.getRpcCall()).thenReturn(getCall); 473 when(getCall.getHeader()).thenReturn(getHead); 474 475 CallRunner scanCallTask = mock(CallRunner.class); 476 ServerCall scanCall = mock(ServerCall.class); 477 scanCall.param = ScanRequest.newBuilder().build(); 478 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 479 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 480 when(scanCall.getHeader()).thenReturn(scanHead); 481 when(scanCall.getParam()).thenReturn(scanCall.param); 482 483 ArrayList<Integer> work = new ArrayList<>(); 484 doAnswerTaskExecution(putCallTask, work, 1, 1000); 485 doAnswerTaskExecution(getCallTask, work, 2, 1000); 486 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 487 488 // There are 3 queues: [puts], [gets], [scans] 489 // so the calls will be interleaved 490 scheduler.dispatch(putCallTask); 491 scheduler.dispatch(putCallTask); 492 scheduler.dispatch(putCallTask); 493 scheduler.dispatch(getCallTask); 494 scheduler.dispatch(getCallTask); 495 scheduler.dispatch(getCallTask); 496 scheduler.dispatch(scanCallTask); 497 scheduler.dispatch(scanCallTask); 498 scheduler.dispatch(scanCallTask); 499 500 while (work.size() < 6) { 501 Thread.sleep(100); 502 } 503 504 for (int i = 0; i < work.size() - 2; i += 3) { 505 assertNotEquals(work.get(i + 0), work.get(i + 1)); 506 assertNotEquals(work.get(i + 0), work.get(i + 2)); 507 assertNotEquals(work.get(i + 1), work.get(i + 2)); 508 } 509 } finally { 510 scheduler.stop(); 511 } 512 } 513 514 private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results, 515 final int value, final int sleepInterval) { 516 callTask.setStatus(new MonitoredRPCHandlerImpl()); 517 doAnswer(new Answer<Object>() { 518 @Override 519 public Object answer(InvocationOnMock invocation) { 520 synchronized (results) { 521 results.add(value); 522 } 523 Threads.sleepWithoutInterrupt(sleepInterval); 524 return null; 525 } 526 }).when(callTask).run(); 527 } 528 529 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 530 throws InterruptedException { 531 while (scheduler.getGeneralQueueLength() > 0) { 532 Thread.sleep(100); 533 } 534 } 535 536 @Test 537 public void testSoftAndHardQueueLimits() throws Exception { 538 539 Configuration schedConf = HBaseConfiguration.create(); 540 541 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 542 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 543 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 544 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 545 546 PriorityFunction priority = mock(PriorityFunction.class); 547 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 548 SimpleRpcScheduler scheduler = 549 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 550 try { 551 scheduler.start(); 552 553 CallRunner putCallTask = mock(CallRunner.class); 554 ServerCall putCall = mock(ServerCall.class); 555 putCall.param = 556 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 557 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 558 when(putCallTask.getRpcCall()).thenReturn(putCall); 559 when(putCall.getHeader()).thenReturn(putHead); 560 561 assertTrue(scheduler.dispatch(putCallTask)); 562 563 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 564 scheduler.onConfigurationChange(schedConf); 565 assertFalse(scheduler.dispatch(putCallTask)); 566 waitUntilQueueEmpty(scheduler); 567 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 568 scheduler.onConfigurationChange(schedConf); 569 assertTrue(scheduler.dispatch(putCallTask)); 570 } finally { 571 scheduler.stop(); 572 } 573 } 574 575 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 576 577 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 578 579 private long offset; 580 581 private final Set<String> threadNamePrefixs = new HashSet<>(); 582 583 @Override 584 public long currentTime() { 585 for (String threadNamePrefix : threadNamePrefixs) { 586 String threadName = Thread.currentThread().getName(); 587 if (threadName.startsWith(threadNamePrefix)) { 588 if (timeQ != null) { 589 Long qTime = timeQ.poll(); 590 if (qTime != null) { 591 return qTime.longValue() + offset; 592 } 593 } 594 } 595 } 596 return System.currentTimeMillis(); 597 } 598 } 599 600 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay 601 // from the get go. So we are always overloaded. The test below would seem to complete the 602 // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping 603 // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for 604 // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e. 605 // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath 606 // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, 607 // numCallQueues... Codel is hard to test. This test is going to be flakey given it all 608 // timer-based. Disabling for now till chat with authors. 609 @Test 610 public void testCoDelScheduling() throws Exception { 611 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 612 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 613 Configuration schedConf = HBaseConfiguration.create(); 614 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 615 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 616 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 617 PriorityFunction priority = mock(PriorityFunction.class); 618 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 619 SimpleRpcScheduler scheduler = 620 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 621 try { 622 // Loading mocked call runner can take a good amount of time the first time through 623 // (haven't looked why). Load it for first time here outside of the timed loop. 624 getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2); 625 scheduler.start(); 626 EnvironmentEdgeManager.injectEdge(envEdge); 627 envEdge.offset = 5; 628 // Calls faster than min delay 629 // LOG.info("Start"); 630 for (int i = 0; i < 100; i++) { 631 long time = EnvironmentEdgeManager.currentTime(); 632 envEdge.timeQ.put(time); 633 CallRunner cr = getMockedCallRunner(time, 2); 634 scheduler.dispatch(cr); 635 } 636 // LOG.info("Loop done"); 637 // make sure fast calls are handled 638 waitUntilQueueEmpty(scheduler); 639 Thread.sleep(100); 640 assertEquals("None of these calls should have been discarded", 0, 641 scheduler.getNumGeneralCallsDropped()); 642 643 envEdge.offset = 151; 644 // calls slower than min delay, but not individually slow enough to be dropped 645 for (int i = 0; i < 20; i++) { 646 long time = EnvironmentEdgeManager.currentTime(); 647 envEdge.timeQ.put(time); 648 CallRunner cr = getMockedCallRunner(time, 2); 649 scheduler.dispatch(cr); 650 } 651 652 // make sure somewhat slow calls are handled 653 waitUntilQueueEmpty(scheduler); 654 Thread.sleep(100); 655 assertEquals("None of these calls should have been discarded", 0, 656 scheduler.getNumGeneralCallsDropped()); 657 658 envEdge.offset = 2000; 659 // now slow calls and the ones to be dropped 660 for (int i = 0; i < 60; i++) { 661 long time = EnvironmentEdgeManager.currentTime(); 662 envEdge.timeQ.put(time); 663 CallRunner cr = getMockedCallRunner(time, 100); 664 scheduler.dispatch(cr); 665 } 666 667 // make sure somewhat slow calls are handled 668 waitUntilQueueEmpty(scheduler); 669 Thread.sleep(100); 670 assertTrue("There should have been at least 12 calls dropped however there were " 671 + scheduler.getNumGeneralCallsDropped(), scheduler.getNumGeneralCallsDropped() > 12); 672 } finally { 673 scheduler.stop(); 674 } 675 } 676 677 @Test 678 public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { 679 String name = testName.getMethodName(); 680 int handlerCount = 1; 681 String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; 682 int maxQueueLength = 0; 683 PriorityFunction priority = mock(PriorityFunction.class); 684 Configuration conf = HBaseConfiguration.create(); 685 Abortable abortable = mock(Abortable.class); 686 FastPathBalancedQueueRpcExecutor executor = 687 Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, 688 maxQueueLength, priority, conf, abortable)); 689 CallRunner task = mock(CallRunner.class); 690 assertFalse(executor.dispatch(task)); 691 // make sure we never internally get a handler, which would skip the queue validation 692 Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), 693 Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 694 } 695 696 @Test 697 public void testMetaRWScanQueues() throws Exception { 698 Configuration schedConf = HBaseConfiguration.create(); 699 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 700 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 701 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 702 703 PriorityFunction priority = mock(PriorityFunction.class); 704 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 705 706 RpcScheduler scheduler = 707 new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD); 708 try { 709 scheduler.start(); 710 711 CallRunner putCallTask = mock(CallRunner.class); 712 ServerCall putCall = mock(ServerCall.class); 713 putCall.param = 714 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 715 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 716 when(putCallTask.getRpcCall()).thenReturn(putCall); 717 when(putCall.getHeader()).thenReturn(putHead); 718 when(putCall.getParam()).thenReturn(putCall.param); 719 720 CallRunner getCallTask = mock(CallRunner.class); 721 ServerCall getCall = mock(ServerCall.class); 722 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 723 when(getCallTask.getRpcCall()).thenReturn(getCall); 724 when(getCall.getHeader()).thenReturn(getHead); 725 726 CallRunner scanCallTask = mock(CallRunner.class); 727 ServerCall scanCall = mock(ServerCall.class); 728 scanCall.param = ScanRequest.newBuilder().build(); 729 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 730 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 731 when(scanCall.getHeader()).thenReturn(scanHead); 732 when(scanCall.getParam()).thenReturn(scanCall.param); 733 734 ArrayList<Integer> work = new ArrayList<>(); 735 doAnswerTaskExecution(putCallTask, work, 1, 1000); 736 doAnswerTaskExecution(getCallTask, work, 2, 1000); 737 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 738 739 // There are 3 queues: [puts], [gets], [scans] 740 // so the calls will be interleaved 741 scheduler.dispatch(putCallTask); 742 scheduler.dispatch(putCallTask); 743 scheduler.dispatch(putCallTask); 744 scheduler.dispatch(getCallTask); 745 scheduler.dispatch(getCallTask); 746 scheduler.dispatch(getCallTask); 747 scheduler.dispatch(scanCallTask); 748 scheduler.dispatch(scanCallTask); 749 scheduler.dispatch(scanCallTask); 750 751 while (work.size() < 6) { 752 Thread.sleep(100); 753 } 754 755 for (int i = 0; i < work.size() - 2; i += 3) { 756 assertNotEquals(work.get(i + 0), work.get(i + 1)); 757 assertNotEquals(work.get(i + 0), work.get(i + 2)); 758 assertNotEquals(work.get(i + 1), work.get(i + 2)); 759 } 760 } finally { 761 scheduler.stop(); 762 } 763 } 764 765 // Get mocked call that has the CallRunner sleep for a while so that the fast 766 // path isn't hit. 767 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 768 ServerCall putCall = new ServerCall(1, null, null, 769 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 770 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 771 null, null, 9, null, timestamp, 0, null, null, null) { 772 773 @Override 774 public void sendResponseIfReady() throws IOException { 775 } 776 }; 777 778 return new CallRunner(null, putCall) { 779 @Override 780 public void run() { 781 if (sleepTime <= 0) { 782 return; 783 } 784 try { 785 LOG.warn("Sleeping for " + sleepTime); 786 Thread.sleep(sleepTime); 787 LOG.warn("Done Sleeping for " + sleepTime); 788 } catch (InterruptedException e) { 789 } 790 } 791 792 @Override 793 public RpcCall getRpcCall() { 794 return putCall; 795 } 796 797 @Override 798 public void drop() { 799 } 800 }; 801 } 802}