001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.timeout; 030import static org.mockito.Mockito.verify; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.lang.reflect.Field; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.BlockingQueue; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.LinkedBlockingQueue; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.Abortable; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 051import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.testclassification.RPCTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.EnvironmentEdge; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.Threads; 058import org.junit.Before; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064import org.mockito.Mockito; 065import org.mockito.invocation.InvocationOnMock; 066import org.mockito.stubbing.Answer; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 072import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 073import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 074 075import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 079 080@Category({ RPCTests.class, MediumTests.class }) 081public class TestSimpleRpcScheduler { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class); 086 087 @Rule 088 public TestName testName = new TestName(); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class); 091 092 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { 093 @Override 094 public InetSocketAddress getListenerAddress() { 095 return InetSocketAddress.createUnresolved("127.0.0.1", 1000); 096 } 097 }; 098 private Configuration conf; 099 100 @Before 101 public void setUp() { 102 conf = HBaseConfiguration.create(); 103 } 104 105 @Test 106 public void testBasic() throws IOException, InterruptedException { 107 108 PriorityFunction qosFunction = mock(PriorityFunction.class); 109 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0); 110 scheduler.init(CONTEXT); 111 scheduler.start(); 112 CallRunner task = createMockTask(HConstants.NORMAL_QOS); 113 task.setStatus(new MonitoredRPCHandlerImpl("test")); 114 scheduler.dispatch(task); 115 verify(task, timeout(10000)).run(); 116 scheduler.stop(); 117 } 118 119 private RpcScheduler disableHandlers(RpcScheduler scheduler) { 120 try { 121 Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); 122 ExecutorField.setAccessible(true); 123 124 RpcExecutor rpcExecutor = (RpcExecutor) ExecutorField.get(scheduler); 125 126 Field handlerCountField = 127 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount"); 128 129 handlerCountField.setAccessible(true); 130 handlerCountField.set(rpcExecutor, 0); 131 132 Field numCallQueuesField = 133 rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues"); 134 135 numCallQueuesField.setAccessible(true); 136 numCallQueuesField.set(rpcExecutor, 1); 137 138 Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass() 139 .getDeclaredField("currentQueueLimit"); 140 141 currentQueueLimitField.setAccessible(true); 142 currentQueueLimitField.set(rpcExecutor, 100); 143 144 } catch (NoSuchFieldException e) { 145 LOG.error("No such field exception" + e); 146 } catch (IllegalAccessException e) { 147 LOG.error("Illegal access exception" + e); 148 } 149 150 return scheduler; 151 } 152 153 @Test 154 public void testCallQueueInfo() throws IOException, InterruptedException { 155 156 PriorityFunction qosFunction = mock(PriorityFunction.class); 157 RpcScheduler scheduler = new SimpleRpcScheduler(conf, 0, 0, 0, qosFunction, 0); 158 159 scheduler.init(CONTEXT); 160 161 // Set the handlers to zero. So that number of requests in call Queue can be tested 162 scheduler = disableHandlers(scheduler); 163 scheduler.start(); 164 165 int totalCallMethods = 10; 166 for (int i = totalCallMethods; i > 0; i--) { 167 CallRunner task = createMockTask(HConstants.NORMAL_QOS); 168 task.setStatus(new MonitoredRPCHandlerImpl("test")); 169 scheduler.dispatch(task); 170 } 171 172 CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); 173 174 for (String callQueueName : callQueueInfo.getCallQueueNames()) { 175 176 for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) { 177 assertEquals(totalCallMethods, 178 callQueueInfo.getCallMethodCount(callQueueName, calledMethod)); 179 } 180 181 } 182 183 scheduler.stop(); 184 185 } 186 187 @Test 188 public void testHandlerIsolation() throws IOException, InterruptedException { 189 CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS); 190 CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1); 191 CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS); 192 List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask); 193 Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask, 194 HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS); 195 PriorityFunction qosFunction = mock(PriorityFunction.class); 196 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap(); 197 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); 198 Answer<Void> answerToRun = new Answer<Void>() { 199 @Override 200 public Void answer(InvocationOnMock invocationOnMock) throws Throwable { 201 synchronized (handlerThreads) { 202 handlerThreads.put((CallRunner) invocationOnMock.getMock(), Thread.currentThread()); 203 } 204 countDownLatch.countDown(); 205 return null; 206 } 207 }; 208 for (CallRunner task : tasks) { 209 task.setStatus(new MonitoredRPCHandlerImpl("test")); 210 doAnswer(answerToRun).when(task).run(); 211 } 212 213 RpcScheduler scheduler = 214 new SimpleRpcScheduler(conf, 1, 1, 1, qosFunction, HConstants.HIGH_QOS); 215 scheduler.init(CONTEXT); 216 scheduler.start(); 217 for (CallRunner task : tasks) { 218 when(qosFunction.getPriority(any(), any(), any())).thenReturn(qos.get(task)); 219 scheduler.dispatch(task); 220 } 221 for (CallRunner task : tasks) { 222 verify(task, timeout(10000)).run(); 223 } 224 scheduler.stop(); 225 226 // Tests that these requests are handled by three distinct threads. 227 countDownLatch.await(); 228 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); 229 } 230 231 private CallRunner createMockTask(int priority) { 232 ServerCall call = mock(ServerCall.class); 233 CallRunner task = mock(CallRunner.class); 234 RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build(); 235 when(task.getRpcCall()).thenReturn(call); 236 when(call.getHeader()).thenReturn(header); 237 return task; 238 } 239 240 @Test 241 public void testRpcScheduler() throws Exception { 242 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 243 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 244 } 245 246 @Test 247 public void testPluggableRpcQueue() throws Exception { 248 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 249 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 250 251 try { 252 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, "MissingClass"); 253 fail("Expected a PluggableRpcQueueNotFound for unloaded class"); 254 } catch (PluggableRpcQueueNotFound e) { 255 // expected 256 } catch (Exception e) { 257 fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e); 258 } 259 260 try { 261 testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, 262 "org.apache.hadoop.hbase.ipc.SimpleRpcServer"); 263 fail("Expected a PluggableRpcQueueNotFound for incompatible class"); 264 } catch (PluggableRpcQueueNotFound e) { 265 // expected 266 } catch (Exception e) { 267 fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e); 268 } 269 } 270 271 @Test 272 public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception { 273 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 274 Configuration schedConf = HBaseConfiguration.create(); 275 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 276 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 277 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 278 schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true); 279 280 PriorityFunction priority = mock(PriorityFunction.class); 281 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 282 SimpleRpcScheduler scheduler = 283 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 284 285 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 286 f.setAccessible(true); 287 assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor); 288 } 289 290 @Test 291 public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception { 292 String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; 293 Configuration schedConf = HBaseConfiguration.create(); 294 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 295 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 296 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 297 298 PriorityFunction priority = mock(PriorityFunction.class); 299 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 300 SimpleRpcScheduler scheduler = 301 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 302 303 Field f = scheduler.getClass().getDeclaredField("callExecutor"); 304 f.setAccessible(true); 305 assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor); 306 } 307 308 @Test 309 public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { 310 311 Configuration schedConf = HBaseConfiguration.create(); 312 313 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); 314 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 315 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 316 RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 317 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 318 "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); 319 320 PriorityFunction priority = mock(PriorityFunction.class); 321 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 322 SimpleRpcScheduler scheduler = 323 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 324 try { 325 scheduler.start(); 326 327 CallRunner putCallTask = mock(CallRunner.class); 328 ServerCall putCall = mock(ServerCall.class); 329 putCall.param = 330 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 331 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 332 when(putCallTask.getRpcCall()).thenReturn(putCall); 333 when(putCall.getHeader()).thenReturn(putHead); 334 335 assertTrue(scheduler.dispatch(putCallTask)); 336 337 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4); 338 scheduler.onConfigurationChange(schedConf); 339 assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange()); 340 waitUntilQueueEmpty(scheduler); 341 } finally { 342 scheduler.stop(); 343 } 344 } 345 346 private void testRpcScheduler(final String queueType) throws Exception { 347 testRpcScheduler(queueType, null); 348 } 349 350 private void testRpcScheduler(final String queueType, final String pluggableQueueClass) 351 throws Exception { 352 Configuration schedConf = HBaseConfiguration.create(); 353 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); 354 355 if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) { 356 schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass); 357 } 358 359 PriorityFunction priority = mock(PriorityFunction.class); 360 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 361 362 RpcScheduler scheduler = 363 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 364 try { 365 scheduler.start(); 366 367 CallRunner smallCallTask = mock(CallRunner.class); 368 ServerCall smallCall = mock(ServerCall.class); 369 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); 370 when(smallCallTask.getRpcCall()).thenReturn(smallCall); 371 when(smallCall.getHeader()).thenReturn(smallHead); 372 373 CallRunner largeCallTask = mock(CallRunner.class); 374 ServerCall largeCall = mock(ServerCall.class); 375 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); 376 when(largeCallTask.getRpcCall()).thenReturn(largeCall); 377 when(largeCall.getHeader()).thenReturn(largeHead); 378 379 CallRunner hugeCallTask = mock(CallRunner.class); 380 ServerCall hugeCall = mock(ServerCall.class); 381 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); 382 when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); 383 when(hugeCall.getHeader()).thenReturn(hugeHead); 384 385 when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L); 386 when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L); 387 when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L); 388 389 final ArrayList<Integer> work = new ArrayList<>(); 390 doAnswerTaskExecution(smallCallTask, work, 10, 250); 391 doAnswerTaskExecution(largeCallTask, work, 50, 250); 392 doAnswerTaskExecution(hugeCallTask, work, 100, 250); 393 394 scheduler.dispatch(smallCallTask); 395 scheduler.dispatch(smallCallTask); 396 scheduler.dispatch(smallCallTask); 397 scheduler.dispatch(hugeCallTask); 398 scheduler.dispatch(smallCallTask); 399 scheduler.dispatch(largeCallTask); 400 scheduler.dispatch(smallCallTask); 401 scheduler.dispatch(smallCallTask); 402 403 while (work.size() < 8) { 404 Thread.sleep(100); 405 } 406 407 int seqSum = 0; 408 int totalTime = 0; 409 for (int i = 0; i < work.size(); ++i) { 410 LOG.debug("Request i=" + i + " value=" + work.get(i)); 411 seqSum += work.get(i); 412 totalTime += seqSum; 413 } 414 LOG.debug("Total Time: " + totalTime); 415 416 // -> [small small small huge small large small small] 417 // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) 418 // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) 419 if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { 420 assertEquals(530, totalTime); 421 } else if ( 422 queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) 423 || queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE) 424 ) { 425 assertEquals(930, totalTime); 426 } 427 } finally { 428 scheduler.stop(); 429 } 430 } 431 432 @Test 433 public void testScanQueueWithZeroScanRatio() throws Exception { 434 435 Configuration schedConf = HBaseConfiguration.create(); 436 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 437 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); 438 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f); 439 440 PriorityFunction priority = mock(PriorityFunction.class); 441 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 442 443 RpcScheduler scheduler = 444 new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, HConstants.QOS_THRESHOLD); 445 assertNotEquals(null, scheduler); 446 } 447 448 @Test 449 public void testScanQueues() throws Exception { 450 Configuration schedConf = HBaseConfiguration.create(); 451 schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 452 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 453 schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 454 455 PriorityFunction priority = mock(PriorityFunction.class); 456 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 457 458 RpcScheduler scheduler = 459 new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); 460 try { 461 scheduler.start(); 462 463 CallRunner putCallTask = mock(CallRunner.class); 464 ServerCall putCall = mock(ServerCall.class); 465 putCall.param = 466 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 467 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 468 when(putCallTask.getRpcCall()).thenReturn(putCall); 469 when(putCall.getHeader()).thenReturn(putHead); 470 when(putCall.getParam()).thenReturn(putCall.param); 471 472 CallRunner getCallTask = mock(CallRunner.class); 473 ServerCall getCall = mock(ServerCall.class); 474 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); 475 when(getCallTask.getRpcCall()).thenReturn(getCall); 476 when(getCall.getHeader()).thenReturn(getHead); 477 478 CallRunner scanCallTask = mock(CallRunner.class); 479 ServerCall scanCall = mock(ServerCall.class); 480 scanCall.param = ScanRequest.newBuilder().build(); 481 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); 482 when(scanCallTask.getRpcCall()).thenReturn(scanCall); 483 when(scanCall.getHeader()).thenReturn(scanHead); 484 when(scanCall.getParam()).thenReturn(scanCall.param); 485 486 CallRunner bulkLoadCallTask = mock(CallRunner.class); 487 ServerCall bulkLoadCall = mock(ServerCall.class); 488 bulkLoadCall.param = ScanRequest.newBuilder().build(); 489 RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build(); 490 when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall); 491 when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead); 492 when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param); 493 494 ArrayList<Integer> work = new ArrayList<>(); 495 doAnswerTaskExecution(putCallTask, work, 1, 1000); 496 doAnswerTaskExecution(getCallTask, work, 2, 1000); 497 doAnswerTaskExecution(scanCallTask, work, 3, 1000); 498 doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000); 499 500 // There are 3 queues: [puts], [gets], [scans], [bulkload] 501 // so the calls will be interleaved 502 scheduler.dispatch(putCallTask); 503 scheduler.dispatch(putCallTask); 504 scheduler.dispatch(putCallTask); 505 scheduler.dispatch(getCallTask); 506 scheduler.dispatch(getCallTask); 507 scheduler.dispatch(getCallTask); 508 scheduler.dispatch(scanCallTask); 509 scheduler.dispatch(scanCallTask); 510 scheduler.dispatch(scanCallTask); 511 scheduler.dispatch(bulkLoadCallTask); 512 scheduler.dispatch(bulkLoadCallTask); 513 scheduler.dispatch(bulkLoadCallTask); 514 while (work.size() < 6) { 515 Thread.sleep(100); 516 } 517 518 for (int i = 0; i < work.size() - 2; i += 3) { 519 assertNotEquals(work.get(i + 0), work.get(i + 1)); 520 assertNotEquals(work.get(i + 0), work.get(i + 2)); 521 assertNotEquals(work.get(i + 1), work.get(i + 2)); 522 } 523 } finally { 524 scheduler.stop(); 525 } 526 } 527 528 private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results, 529 final int value, final int sleepInterval) { 530 callTask.setStatus(new MonitoredRPCHandlerImpl("test")); 531 doAnswer(new Answer<Object>() { 532 @Override 533 public Object answer(InvocationOnMock invocation) { 534 synchronized (results) { 535 results.add(value); 536 } 537 Threads.sleepWithoutInterrupt(sleepInterval); 538 return null; 539 } 540 }).when(callTask).run(); 541 } 542 543 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) 544 throws InterruptedException { 545 while (scheduler.getGeneralQueueLength() > 0) { 546 Thread.sleep(100); 547 } 548 } 549 550 @Test 551 public void testSoftAndHardQueueLimits() throws Exception { 552 553 Configuration schedConf = HBaseConfiguration.create(); 554 555 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); 556 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); 557 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 558 RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 559 560 PriorityFunction priority = mock(PriorityFunction.class); 561 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 562 SimpleRpcScheduler scheduler = 563 new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); 564 try { 565 scheduler.start(); 566 567 CallRunner putCallTask = mock(CallRunner.class); 568 ServerCall putCall = mock(ServerCall.class); 569 putCall.param = 570 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 571 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 572 when(putCallTask.getRpcCall()).thenReturn(putCall); 573 when(putCall.getHeader()).thenReturn(putHead); 574 575 assertTrue(scheduler.dispatch(putCallTask)); 576 577 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 578 scheduler.onConfigurationChange(schedConf); 579 assertFalse(scheduler.dispatch(putCallTask)); 580 waitUntilQueueEmpty(scheduler); 581 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); 582 scheduler.onConfigurationChange(schedConf); 583 assertTrue(scheduler.dispatch(putCallTask)); 584 } finally { 585 scheduler.stop(); 586 } 587 } 588 589 private static final class CoDelEnvironmentEdge implements EnvironmentEdge { 590 591 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>(); 592 593 private long offset; 594 595 private final Set<String> threadNamePrefixs = new HashSet<>(); 596 597 @Override 598 public long currentTime() { 599 for (String threadNamePrefix : threadNamePrefixs) { 600 String threadName = Thread.currentThread().getName(); 601 if (threadName.startsWith(threadNamePrefix)) { 602 if (timeQ != null) { 603 Long qTime = timeQ.poll(); 604 if (qTime != null) { 605 return qTime.longValue() + offset; 606 } 607 } 608 } 609 } 610 return System.currentTimeMillis(); 611 } 612 } 613 614 // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay 615 // from the get go. So we are always overloaded. The test below would seem to complete the 616 // queuing of all the CallRunners inside the codel check interval. I don't think we are skipping 617 // codel checking. Second, I think this test has been broken since HBASE-16089 Add on FastPath for 618 // CoDel went in. The thread name we were looking for was the name BEFORE we updated: i.e. 619 // "RpcServer.CodelBQ.default.handler". But same patch changed the name of the codel fastpath 620 // thread to: new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, 621 // numCallQueues... Codel is hard to test. This test is going to be flakey given it all 622 // timer-based. Disabling for now till chat with authors. 623 @Test 624 public void testCoDelScheduling() throws Exception { 625 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); 626 envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler"); 627 Configuration schedConf = HBaseConfiguration.create(); 628 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); 629 schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, 630 RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 631 PriorityFunction priority = mock(PriorityFunction.class); 632 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); 633 SimpleRpcScheduler scheduler = 634 new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); 635 try { 636 // Loading mocked call runner can take a good amount of time the first time through 637 // (haven't looked why). Load it for first time here outside of the timed loop. 638 getMockedCallRunner(EnvironmentEdgeManager.currentTime(), 2); 639 scheduler.start(); 640 EnvironmentEdgeManager.injectEdge(envEdge); 641 envEdge.offset = 5; 642 // Calls faster than min delay 643 // LOG.info("Start"); 644 for (int i = 0; i < 100; i++) { 645 long time = EnvironmentEdgeManager.currentTime(); 646 envEdge.timeQ.put(time); 647 CallRunner cr = getMockedCallRunner(time, 2); 648 scheduler.dispatch(cr); 649 } 650 // LOG.info("Loop done"); 651 // make sure fast calls are handled 652 waitUntilQueueEmpty(scheduler); 653 Thread.sleep(100); 654 assertEquals("None of these calls should have been discarded", 0, 655 scheduler.getNumGeneralCallsDropped()); 656 657 envEdge.offset = 151; 658 // calls slower than min delay, but not individually slow enough to be dropped 659 for (int i = 0; i < 20; i++) { 660 long time = EnvironmentEdgeManager.currentTime(); 661 envEdge.timeQ.put(time); 662 CallRunner cr = getMockedCallRunner(time, 2); 663 scheduler.dispatch(cr); 664 } 665 666 // make sure somewhat slow calls are handled 667 waitUntilQueueEmpty(scheduler); 668 Thread.sleep(100); 669 assertEquals("None of these calls should have been discarded", 0, 670 scheduler.getNumGeneralCallsDropped()); 671 672 envEdge.offset = 2000; 673 // now slow calls and the ones to be dropped 674 for (int i = 0; i < 60; i++) { 675 long time = EnvironmentEdgeManager.currentTime(); 676 envEdge.timeQ.put(time); 677 CallRunner cr = getMockedCallRunner(time, 100); 678 scheduler.dispatch(cr); 679 } 680 681 // make sure somewhat slow calls are handled 682 waitUntilQueueEmpty(scheduler); 683 Thread.sleep(100); 684 assertTrue("There should have been at least 12 calls dropped however there were " 685 + scheduler.getNumGeneralCallsDropped(), scheduler.getNumGeneralCallsDropped() > 12); 686 } finally { 687 scheduler.stop(); 688 } 689 } 690 691 @Test 692 public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { 693 String name = testName.getMethodName(); 694 int handlerCount = 1; 695 String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; 696 int maxQueueLength = 0; 697 PriorityFunction priority = mock(PriorityFunction.class); 698 Configuration conf = HBaseConfiguration.create(); 699 Abortable abortable = mock(Abortable.class); 700 FastPathBalancedQueueRpcExecutor executor = 701 Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, 702 maxQueueLength, priority, conf, abortable)); 703 CallRunner task = mock(CallRunner.class); 704 assertFalse(executor.dispatch(task)); 705 // make sure we never internally get a handler, which would skip the queue validation 706 Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), 707 Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 708 } 709 710 @Test 711 public void testMetaRWScanQueues() throws Exception { 712 Configuration schedConf = HBaseConfiguration.create(); 713 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); 714 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); 715 schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); 716 717 PriorityFunction priority = mock(PriorityFunction.class); 718 when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); 719 720 RpcScheduler scheduler = 721 new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, HConstants.QOS_THRESHOLD); 722 try { 723 scheduler.start(); 724 725 CallRunner putCallTask = mock(CallRunner.class); 726 ServerCall putCall = mock(ServerCall.class); 727 putCall.param = 728 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); 729 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); 730 when(putCallTask.getRpcCall()).thenReturn(putCall); 731 when(putCall.getHeader()).thenReturn(putHead); 732 when(putCall.getParam()).thenReturn(putCall.param); 733 734 CallRunner clientReadCallTask = mock(CallRunner.class); 735 ServerCall clientReadCall = mock(ServerCall.class); 736 RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build(); 737 when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall); 738 when(clientReadCall.getHeader()).thenReturn(clientReadHead); 739 740 CallRunner internalReadCallTask = mock(CallRunner.class); 741 ServerCall internalReadCall = mock(ServerCall.class); 742 internalReadCall.param = ScanRequest.newBuilder().build(); 743 RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan") 744 .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build(); 745 when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall); 746 when(internalReadCall.getHeader()).thenReturn(masterReadHead); 747 when(internalReadCall.getParam()).thenReturn(internalReadCall.param); 748 749 ArrayList<Integer> work = new ArrayList<>(); 750 doAnswerTaskExecution(putCallTask, work, 1, 1000); 751 doAnswerTaskExecution(clientReadCallTask, work, 2, 1000); 752 doAnswerTaskExecution(internalReadCallTask, work, 3, 1000); 753 754 // There are 3 queues: [puts], [gets], [scans] 755 // so the calls will be interleaved 756 scheduler.dispatch(putCallTask); 757 scheduler.dispatch(putCallTask); 758 scheduler.dispatch(putCallTask); 759 scheduler.dispatch(clientReadCallTask); 760 scheduler.dispatch(clientReadCallTask); 761 scheduler.dispatch(clientReadCallTask); 762 scheduler.dispatch(internalReadCallTask); 763 scheduler.dispatch(internalReadCallTask); 764 scheduler.dispatch(internalReadCallTask); 765 766 while (work.size() < 6) { 767 Thread.sleep(100); 768 } 769 770 for (int i = 0; i < work.size() - 2; i += 3) { 771 assertNotEquals(work.get(i + 0), work.get(i + 1)); 772 assertNotEquals(work.get(i + 0), work.get(i + 2)); 773 assertNotEquals(work.get(i + 1), work.get(i + 2)); 774 } 775 } finally { 776 scheduler.stop(); 777 } 778 } 779 780 // Get mocked call that has the CallRunner sleep for a while so that the fast 781 // path isn't hit. 782 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { 783 ServerCall putCall = new ServerCall(1, null, null, 784 RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), 785 RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), 786 null, null, 9, null, timestamp, 0, null, null, null) { 787 788 @Override 789 public void sendResponseIfReady() throws IOException { 790 } 791 }; 792 793 return new CallRunner(null, putCall) { 794 @Override 795 public void run() { 796 if (sleepTime <= 0) { 797 return; 798 } 799 try { 800 LOG.warn("Sleeping for " + sleepTime); 801 Thread.sleep(sleepTime); 802 LOG.warn("Done Sleeping for " + sleepTime); 803 } catch (InterruptedException e) { 804 } 805 } 806 807 @Override 808 public RpcCall getRpcCall() { 809 return putCall; 810 } 811 812 @Override 813 public void drop() { 814 } 815 }; 816 } 817}