001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.timeout; 030import static org.mockito.Mockito.verify; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.lang.reflect.Field; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.BlockingQueue; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.LinkedBlockingQueue; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.Abortable; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RPCTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdge; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.Threads; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.mockito.Mockito; 064import org.mockito.invocation.InvocationOnMock; 065import org.mockito.stubbing.Answer; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 072import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 073 074import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 078 079@Category({ RPCTests.class, MediumTests.class }) 080public class TestSimpleRpcScheduler { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class); 085 086 @Rule 087 public TestName testName = new TestName(); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 090 091 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 092 @Override 093 public InetSocketAddress getListenerAddress() { 094 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 095 } 096 }; 097 private Configuration conf; 098 099 @Before 100 public void setUp() { 101 conf = HBaseConfiguration.create(); 102 } 103 104 @Test 105 public void testBasic() throws IOException, InterruptedException { 106 107 PriorityFunction qosFunction = mock(PriorityFunction.class); 108 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0); 109 scheduler.init(CONTEXT); 110 scheduler.start(); 111 CallRunner task = createMockTask(); 112 task.setStatus(new MonitoredRPCHandlerImpl("test")); 113 scheduler.dispatch(task); 114 verify(task, timeout(10000)).run(); 115 scheduler.stop(); 116 } 117 118 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 119 try { 120 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 121 ExecutorField.setAccessible(true); 122 123 RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler); 124 125 Field handlerCountField = 126 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount"); 127 128 handlerCountField.setAccessible(true); 129 handlerCountField.set(rpcExecutor, 0); 130 131 Field numCallQueuesField = 132 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues"); 133 134 numCallQueuesField.setAccessible(true); 135 numCallQueuesField.set(rpcExecutor, 1); 136 137 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass() 138 .getDeclaredField("currentQueueLimit"); 139 140 currentQueueLimitField.setAccessible(true); 141 currentQueueLimitField.set(rpcExecutor, 100); 142 143 } catch (NoSuchFieldException e) { 144 LOG.error("No such field exception" + e); 145 } catch (IllegalAccessException e) { 146 LOG.error("Illegal access exception" + e); 147 } 148 149 return scheduler; 150 } 151 152 @Test 153 public void testCallQueueInfo() throws IOException, InterruptedException { 154 155 PriorityFunction qosFunction = mock(PriorityFunction.class); 156 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0); 157 158 scheduler.init(CONTEXT); 159 160 // Set the handlers to zero. So that number of requests in call Queue can be tested 161 scheduler = disableHandlers(scheduler); 162 scheduler.start(); 163 164 int totalCallMethods = 10; 165 for (int i = totalCallMethods; i > 0; i--) { 166 CallRunner task = createMockTask(); 167 task.setStatus(new MonitoredRPCHandlerImpl("test")); 168 scheduler.dispatch(task); 169 } 170 171 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 172 173 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 174 175 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 176 assertEquals(totalCallMethods, 177 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 178 } 179 180 } 181 182 scheduler.stop(); 183 184 } 185 186 @Test 187 public void testHandlerIsolation() throws IOException, InterruptedException { 188 CallRunner generalTask = createMockTask(); 189 CallRunner priorityTask = createMockTask(); 190 CallRunner replicationTask = createMockTask(); 191 List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask); 192 Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask, 193 HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS); 194 PriorityFunction qosFunction = mock(PriorityFunction.class); 195 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 196 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 197 Answer<Void> answerToRun = new Answer<Void>() { 198 @Override 199 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 200 synchronized (handlerThreads) { 201 handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread()); 202 } 203 countDownLatch.countDown(); 204 return null; 205 } 206 }; 207 for (CallRunner task : tasks) { 208 task.setStatus(new MonitoredRPCHandlerImpl("test")); 209 doAnswer(answerToRun).when(task).run(); 210 } 211 212 RpcScheduler scheduler = 213 new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS); 214 scheduler.init(CONTEXT); 215 scheduler.start(); 216 for (CallRunner task : tasks) { 217 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 218 scheduler.dispatch(task); 219 } 220 for (CallRunner task : tasks) { 221 verify(task, timeout(10000)).run(); 222 } 223 scheduler.stop(); 224 225 // Tests that these requests are handled by three distinct threads. 226 countDownLatch.await(); 227 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 228 } 229 230 private CallRunner createMockTask() { 231 ServerCall call = mock(ServerCall.class); 232 CallRunner task = mock(CallRunner.class); 233 when(task.getRpcCall()).thenReturn(call); 234 return task; 235 } 236 237 @Test 238 public void testRpcScheduler() throws Exception { 239 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 240 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 241 } 242 243 @Test 244 public void testPluggableRpcQueue() throws Exception { 245 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 246 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 247 248 try { 249 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass"); 250 fail("Expected a PluggableRpcQueueNotFound for unloaded class"); 251 } catch (PluggableRpcQueueNotFound e) { 252 // expected 253 } catch (Exception e) { 254 fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e); 255 } 256 257 try { 258 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 259 "org.apache.hadoop.hbase.ipc.SimpleRpcServer"); 260 fail("Expected a PluggableRpcQueueNotFound for incompatible class"); 261 } catch (PluggableRpcQueueNotFound e) { 262 // expected 263 } catch (Exception e) { 264 fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e); 265 } 266 } 267 268 @Test 269 public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception { 270 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 271 Configuration schedConf = HBaseConfiguration.create(); 272 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 273 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 274 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 275 schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true); 276 277 PriorityFunction priority = mock(PriorityFunction.class); 278 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 279 SimpleRpcScheduler scheduler = 280 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 281 282 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 283 f.setAccessible(true); 284 assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor); 285 } 286 287 @Test 288 public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception { 289 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 290 Configuration schedConf = HBaseConfiguration.create(); 291 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 292 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 293 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 294 295 PriorityFunction priority = mock(PriorityFunction.class); 296 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 297 SimpleRpcScheduler scheduler = 298 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 299 300 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 301 f.setAccessible(true); 302 assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor); 303 } 304 305 @Test 306 public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { 307 308 Configuration schedConf = HBaseConfiguration.create(); 309 310 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); 311 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 312 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 313 RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 314 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 315 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 316 317 PriorityFunction priority = mock(PriorityFunction.class); 318 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 319 SimpleRpcScheduler scheduler = 320 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 321 try { 322 scheduler.start(); 323 324 CallRunner putCallTask = mock(CallRunner.class); 325 ServerCall putCall = mock(ServerCall.class); 326 putCall.param = 327 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 328 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 329 when(putCallTask.getRpcCall()).thenReturn(putCall); 330 when(putCall.getHeader()).thenReturn(putHead); 331 332 assertTrue(scheduler.dispatch(putCallTask)); 333 334 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4); 335 scheduler.onConfigurationChange(schedConf); 336 assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange()); 337 waitUntilQueueEmpty(scheduler); 338 } finally { 339 scheduler.stop(); 340 } 341 } 342 343 private void testRpcScheduler(final String queueType) throws Exception { 344 testRpcScheduler(queueType, null); 345 } 346 347 private void testRpcScheduler(final String queueType, final String pluggableQueueClass) 348 throws Exception { 349 Configuration schedConf = HBaseConfiguration.create(); 350 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 351 352 if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) { 353 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass); 354 } 355 356 PriorityFunction priority = mock(PriorityFunction.class); 357 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 358 359 RpcScheduler scheduler = 360 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 361 try { 362 scheduler.start(); 363 364 CallRunner smallCallTask = mock(CallRunner.class); 365 ServerCall smallCall = mock(ServerCall.class); 366 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 367 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 368 when(smallCall.getHeader()).thenReturn(smallHead); 369 370 CallRunner largeCallTask = mock(CallRunner.class); 371 ServerCall largeCall = mock(ServerCall.class); 372 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 373 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 374 when(largeCall.getHeader()).thenReturn(largeHead); 375 376 CallRunner hugeCallTask = mock(CallRunner.class); 377 ServerCall hugeCall = mock(ServerCall.class); 378 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 379 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 380 when(hugeCall.getHeader()).thenReturn(hugeHead); 381 382 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 383 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 384 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 385 386 final ArrayList<Integer> work = new ArrayList<>(); 387 doAnswerTaskExecution(smallCallTask, work, 10, 250); 388 doAnswerTaskExecution(largeCallTask, work, 50, 250); 389 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 390 391 scheduler.dispatch(smallCallTask); 392 scheduler.dispatch(smallCallTask); 393 scheduler.dispatch(smallCallTask); 394 scheduler.dispatch(hugeCallTask); 395 scheduler.dispatch(smallCallTask); 396 scheduler.dispatch(largeCallTask); 397 scheduler.dispatch(smallCallTask); 398 scheduler.dispatch(smallCallTask); 399 400 while (work.size() < 8) { 401 Thread.sleep(100); 402 } 403 404 int seqSum = 0; 405 int totalTime = 0; 406 for (int i = 0; i < work.size(); ++i) { 407 LOG.debug("Request i=" + i + " value=" + work.get(i)); 408 seqSum += work.get(i); 409 totalTime += seqSum; 410 } 411 LOG.debug("Total Time: " + totalTime); 412 413 // -> [small small small huge small large small small] 414 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 415 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 416 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 417 assertEquals(530, totalTime); 418 } else if ( 419 queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) 420 || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE) 421 ) { 422 assertEquals(930, totalTime); 423 } 424 } finally { 425 scheduler.stop(); 426 } 427 } 428 429 @Test 430 public void testScanQueueWithZeroScanRatio() throws Exception { 431 432 Configuration schedConf = HBaseConfiguration.create(); 433 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 434 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 435 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 436 437 PriorityFunction priority = mock(PriorityFunction.class); 438 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 439 440 RpcScheduler scheduler = 441 new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD); 442 assertNotEquals(null, scheduler); 443 } 444 445 @Test 446 public void testScanQueues() throws Exception { 447 Configuration schedConf = HBaseConfiguration.create(); 448 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 449 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 450 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 451 452 PriorityFunction priority = mock(PriorityFunction.class); 453 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 454 455 RpcScheduler scheduler = 456 new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); 457 try { 458 scheduler.start(); 459 460 CallRunner putCallTask = mock(CallRunner.class); 461 ServerCall putCall = mock(ServerCall.class); 462 putCall.param = 463 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 464 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 465 when(putCallTask.getRpcCall()).thenReturn(putCall); 466 when(putCall.getHeader()).thenReturn(putHead); 467 when(putCall.getParam()).thenReturn(putCall.param); 468 469 CallRunner getCallTask = mock(CallRunner.class); 470 ServerCall getCall = mock(ServerCall.class); 471 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 472 when(getCallTask.getRpcCall()).thenReturn(getCall); 473 when(getCall.getHeader()).thenReturn(getHead); 474 475 CallRunner scanCallTask = mock(CallRunner.class); 476 ServerCall scanCall = mock(ServerCall.class); 477 scanCall.param = ScanRequest.newBuilder().build(); 478 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 479 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 480 when(scanCall.getHeader()).thenReturn(scanHead); 481 when(scanCall.getParam()).thenReturn(scanCall.param); 482 483 CallRunner bulkLoadCallTask = mock(CallRunner.class); 484 ServerCall bulkLoadCall = mock(ServerCall.class); 485 bulkLoadCall.param = ScanRequest.newBuilder().build(); 486 RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build(); 487 when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall); 488 when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead); 489 when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param); 490 491 ArrayList<Integer> work = new ArrayList<>(); 492 doAnswerTaskExecution(putCallTask, work, 1, 1000); 493 doAnswerTaskExecution(getCallTask, work, 2, 1000); 494 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 495 doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000); 496 497 // There are 3 queues: [puts], [gets], [scans], [bulkload] 498 // so the calls will be interleaved 499 scheduler.dispatch(putCallTask); 500 scheduler.dispatch(putCallTask); 501 scheduler.dispatch(putCallTask); 502 scheduler.dispatch(getCallTask); 503 scheduler.dispatch(getCallTask); 504 scheduler.dispatch(getCallTask); 505 scheduler.dispatch(scanCallTask); 506 scheduler.dispatch(scanCallTask); 507 scheduler.dispatch(scanCallTask); 508 scheduler.dispatch(bulkLoadCallTask); 509 scheduler.dispatch(bulkLoadCallTask); 510 scheduler.dispatch(bulkLoadCallTask); 511 while (work.size() < 6) { 512 Thread.sleep(100); 513 } 514 515 for (int i = 0; i < work.size() - 2; i += 3) { 516 assertNotEquals(work.get(i + 0), work.get(i + 1)); 517 assertNotEquals(work.get(i + 0), work.get(i + 2)); 518 assertNotEquals(work.get(i + 1), work.get(i + 2)); 519 } 520 } finally { 521 scheduler.stop(); 522 } 523 } 524 525 private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results, 526 final int value, final int sleepInterval) { 527 callTask.setStatus(new MonitoredRPCHandlerImpl("test")); 528 doAnswer(new Answer<Object>() { 529 @Override 530 public Object answer(InvocationOnMock invocation) { 531 synchronized (results) { 532 results.add(value); 533 } 534 Threads.sleepWithoutInterrupt(sleepInterval); 535 return null; 536 } 537 }).when(callTask).run(); 538 } 539 540 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 541 throws InterruptedException { 542 while (scheduler.getGeneralQueueLength() > 0) { 543 Thread.sleep(100); 544 } 545 } 546 547 @Test 548 public void testSoftAndHardQueueLimits() throws Exception { 549 550 Configuration schedConf = HBaseConfiguration.create(); 551 552 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 553 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 554 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 555 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 556 557 PriorityFunction priority = mock(PriorityFunction.class); 558 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 559 SimpleRpcScheduler scheduler = 560 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 561 try { 562 scheduler.start(); 563 564 CallRunner putCallTask = mock(CallRunner.class); 565 ServerCall putCall = mock(ServerCall.class); 566 putCall.param = 567 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 568 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 569 when(putCallTask.getRpcCall()).thenReturn(putCall); 570 when(putCall.getHeader()).thenReturn(putHead); 571 572 assertTrue(scheduler.dispatch(putCallTask)); 573 574 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 575 scheduler.onConfigurationChange(schedConf); 576 assertFalse(scheduler.dispatch(putCallTask)); 577 waitUntilQueueEmpty(scheduler); 578 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 579 scheduler.onConfigurationChange(schedConf); 580 assertTrue(scheduler.dispatch(putCallTask)); 581 } finally { 582 scheduler.stop(); 583 } 584 } 585 586 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 587 588 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 589 590 private long offset; 591 592 private final Set<String> threadNamePrefixs = new HashSet<>(); 593 594 @Override 595 public long currentTime() { 596 for (String threadNamePrefix : threadNamePrefixs) { 597 String threadName = Thread.currentThread().getName(); 598 if (threadName.startsWith(threadNamePrefix)) { 599 if (timeQ != null) { 600 Long qTime = timeQ.poll(); 601 if (qTime != null) { 602 return qTime.longValue() + offset; 603 } 604 } 605 } 606 } 607 return System.currentTimeMillis(); 608 } 609 } 610 611 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay 612 // from the get go. So we are always overloaded. The test below would seem to complete the 613 // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping 614 // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for 615 // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e. 616 // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath 617 // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, 618 // numCallQueues... Codel is hard to test. This test is going to be flakey given it all 619 // timer-based. Disabling for now till chat with authors. 620 @Test 621 public void testCoDelScheduling() throws Exception { 622 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 623 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 624 Configuration schedConf = HBaseConfiguration.create(); 625 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 626 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 627 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 628 PriorityFunction priority = mock(PriorityFunction.class); 629 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 630 SimpleRpcScheduler scheduler = 631 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 632 try { 633 // Loading mocked call runner can take a good amount of time the first time through 634 // (haven't looked why). Load it for first time here outside of the timed loop. 635 getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2); 636 scheduler.start(); 637 EnvironmentEdgeManager.injectEdge(envEdge); 638 envEdge.offset = 5; 639 // Calls faster than min delay 640 // LOG.info("Start"); 641 for (int i = 0; i < 100; i++) { 642 long time = EnvironmentEdgeManager.currentTime(); 643 envEdge.timeQ.put(time); 644 CallRunner cr = getMockedCallRunner(time, 2); 645 scheduler.dispatch(cr); 646 } 647 // LOG.info("Loop done"); 648 // make sure fast calls are handled 649 waitUntilQueueEmpty(scheduler); 650 Thread.sleep(100); 651 assertEquals("None of these calls should have been discarded", 0, 652 scheduler.getNumGeneralCallsDropped()); 653 654 envEdge.offset = 151; 655 // calls slower than min delay, but not individually slow enough to be dropped 656 for (int i = 0; i < 20; i++) { 657 long time = EnvironmentEdgeManager.currentTime(); 658 envEdge.timeQ.put(time); 659 CallRunner cr = getMockedCallRunner(time, 2); 660 scheduler.dispatch(cr); 661 } 662 663 // make sure somewhat slow calls are handled 664 waitUntilQueueEmpty(scheduler); 665 Thread.sleep(100); 666 assertEquals("None of these calls should have been discarded", 0, 667 scheduler.getNumGeneralCallsDropped()); 668 669 envEdge.offset = 2000; 670 // now slow calls and the ones to be dropped 671 for (int i = 0; i < 60; i++) { 672 long time = EnvironmentEdgeManager.currentTime(); 673 envEdge.timeQ.put(time); 674 CallRunner cr = getMockedCallRunner(time, 100); 675 scheduler.dispatch(cr); 676 } 677 678 // make sure somewhat slow calls are handled 679 waitUntilQueueEmpty(scheduler); 680 Thread.sleep(100); 681 assertTrue("There should have been at least 12 calls dropped however there were " 682 + scheduler.getNumGeneralCallsDropped(), scheduler.getNumGeneralCallsDropped() > 12); 683 } finally { 684 scheduler.stop(); 685 } 686 } 687 688 @Test 689 public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { 690 String name = testName.getMethodName(); 691 int handlerCount = 1; 692 String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; 693 int maxQueueLength = 0; 694 PriorityFunction priority = mock(PriorityFunction.class); 695 Configuration conf = HBaseConfiguration.create(); 696 Abortable abortable = mock(Abortable.class); 697 FastPathBalancedQueueRpcExecutor executor = 698 Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, 699 maxQueueLength, priority, conf, abortable)); 700 CallRunner task = mock(CallRunner.class); 701 assertFalse(executor.dispatch(task)); 702 // make sure we never internally get a handler, which would skip the queue validation 703 Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), 704 Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 705 } 706 707 @Test 708 public void testMetaRWScanQueues() throws Exception { 709 Configuration schedConf = HBaseConfiguration.create(); 710 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 711 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 712 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 713 714 PriorityFunction priority = mock(PriorityFunction.class); 715 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 716 717 RpcScheduler scheduler = 718 new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD); 719 try { 720 scheduler.start(); 721 722 CallRunner putCallTask = mock(CallRunner.class); 723 ServerCall putCall = mock(ServerCall.class); 724 putCall.param = 725 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 726 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 727 when(putCallTask.getRpcCall()).thenReturn(putCall); 728 when(putCall.getHeader()).thenReturn(putHead); 729 when(putCall.getParam()).thenReturn(putCall.param); 730 731 CallRunner getCallTask = mock(CallRunner.class); 732 ServerCall getCall = mock(ServerCall.class); 733 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 734 when(getCallTask.getRpcCall()).thenReturn(getCall); 735 when(getCall.getHeader()).thenReturn(getHead); 736 737 CallRunner scanCallTask = mock(CallRunner.class); 738 ServerCall scanCall = mock(ServerCall.class); 739 scanCall.param = ScanRequest.newBuilder().build(); 740 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 741 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 742 when(scanCall.getHeader()).thenReturn(scanHead); 743 when(scanCall.getParam()).thenReturn(scanCall.param); 744 745 ArrayList<Integer> work = new ArrayList<>(); 746 doAnswerTaskExecution(putCallTask, work, 1, 1000); 747 doAnswerTaskExecution(getCallTask, work, 2, 1000); 748 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 749 750 // There are 3 queues: [puts], [gets], [scans] 751 // so the calls will be interleaved 752 scheduler.dispatch(putCallTask); 753 scheduler.dispatch(putCallTask); 754 scheduler.dispatch(putCallTask); 755 scheduler.dispatch(getCallTask); 756 scheduler.dispatch(getCallTask); 757 scheduler.dispatch(getCallTask); 758 scheduler.dispatch(scanCallTask); 759 scheduler.dispatch(scanCallTask); 760 scheduler.dispatch(scanCallTask); 761 762 while (work.size() < 6) { 763 Thread.sleep(100); 764 } 765 766 for (int i = 0; i < work.size() - 2; i += 3) { 767 assertNotEquals(work.get(i + 0), work.get(i + 1)); 768 assertNotEquals(work.get(i + 0), work.get(i + 2)); 769 assertNotEquals(work.get(i + 1), work.get(i + 2)); 770 } 771 } finally { 772 scheduler.stop(); 773 } 774 } 775 776 // Get mocked call that has the CallRunner sleep for a while so that the fast 777 // path isn't hit. 778 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 779 ServerCall putCall = new ServerCall(1, null, null, 780 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 781 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 782 null, null, 9, null, timestamp, 0, null, null, null) { 783 784 @Override 785 public void sendResponseIfReady() throws IOException { 786 } 787 }; 788 789 return new CallRunner(null, putCall) { 790 @Override 791 public void run() { 792 if (sleepTime <= 0) { 793 return; 794 } 795 try { 796 LOG.warn("Sleeping for " + sleepTime); 797 Thread.sleep(sleepTime); 798 LOG.warn("Done Sleeping for " + sleepTime); 799 } catch (InterruptedException e) { 800 } 801 } 802 803 @Override 804 public RpcCall getRpcCall() { 805 return putCall; 806 } 807 808 @Override 809 public void drop() { 810 } 811 }; 812 } 813}