001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.util.concurrent.TimeUnit; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.testclassification.RegionServerTests; 028import org.apache.hadoop.hbase.testclassification.SmallTests; 029import org.apache.hadoop.hbase.util.EnvironmentEdge; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.junit.After; 032import org.junit.Before; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036 037/** 038 * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive backoff multipliers and 039 * over-subscription functionality. 040 */ 041@Category({ RegionServerTests.class, SmallTests.class }) 042public class TestFeedbackAdaptiveRateLimiter { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestFeedbackAdaptiveRateLimiter.class); 047 048 private ManualEnvironmentEdge testEdge; 049 private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory factory; 050 051 @Before 052 public void setUp() { 053 testEdge = new ManualEnvironmentEdge(); 054 EnvironmentEdgeManager.injectEdge(testEdge); 055 056 Configuration conf = HBaseConfiguration.create(); 057 // Set refill interval for testing 058 conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 500); 059 // Configure adaptive parameters for testing - using larger values than defaults for 060 // observability 061 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 0.1); 062 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT, 063 0.05); 064 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 3.0); 065 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 0.01); 066 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 0.005); 067 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 0.2); 068 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 0.1); 069 070 factory = new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf); 071 } 072 073 @After 074 public void tearDown() { 075 EnvironmentEdgeManager.reset(); 076 } 077 078 @Test 079 public void testBasicFunctionality() { 080 FeedbackAdaptiveRateLimiter limiter = factory.create(); 081 limiter.set(10, TimeUnit.SECONDS); 082 083 // Initially should work like normal rate limiter 084 assertEquals(0, limiter.getWaitIntervalMs()); 085 limiter.consume(5); 086 assertEquals(0, limiter.getWaitIntervalMs()); 087 limiter.consume(5); 088 089 // Should need to wait after consuming full limit 090 assertTrue(limiter.getWaitIntervalMs() > 0); 091 } 092 093 @Test 094 public void testAdaptiveBackoffIncreases() { 095 FeedbackAdaptiveRateLimiter limiter = factory.create(); 096 limiter.set(10, TimeUnit.SECONDS); 097 098 testEdge.setValue(1000); 099 limiter.refill(10); 100 101 // Record initial wait interval 102 limiter.consume(10); 103 long initialWaitInterval = limiter.getWaitInterval(10, 0, 1); 104 assertTrue("Initial wait interval should be positive", initialWaitInterval > 0); 105 106 // Create sustained contention over multiple intervals to increase backoff 107 for (int i = 0; i < 5; i++) { 108 testEdge.setValue(1000 + (i + 1) * 500); 109 limiter.refill(10); 110 limiter.consume(10); 111 // Create contention by asking for more than available 112 limiter.getWaitInterval(10, 0, 1); 113 } 114 115 // After contention, wait interval should increase due to backoff multiplier 116 testEdge.setValue(4000); 117 limiter.refill(10); 118 limiter.consume(10); 119 long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1); 120 121 // With backoffMultiplierIncrement=0.1 and 5 intervals of contention, 122 // multiplier should be around 1.5, so wait should be significantly higher 123 assertTrue( 124 "Wait interval should increase with contention. Initial: " + initialWaitInterval 125 + ", After contention: " + increasedWaitInterval, 126 increasedWaitInterval > initialWaitInterval * 1.3); 127 } 128 129 @Test 130 public void testAdaptiveBackoffDecreases() { 131 FeedbackAdaptiveRateLimiter limiter = factory.create(); 132 limiter.set(10, TimeUnit.SECONDS); 133 134 testEdge.setValue(1000); 135 limiter.refill(10); 136 137 // Build up contention to increase backoff multiplier 138 for (int i = 0; i < 5; i++) { 139 limiter.consume(10); 140 limiter.getWaitInterval(10, 0, 1); // Create contention 141 testEdge.setValue(1000 + (i + 1) * 500); 142 limiter.refill(10); 143 } 144 145 // Measure wait interval with elevated backoff 146 limiter.consume(10); 147 long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1); 148 149 // Run several intervals without contention to decrease backoff 150 for (int i = 0; i < 10; i++) { 151 testEdge.setValue(4000 + i * 500); 152 limiter.refill(10); 153 // Consume less than available - no contention 154 limiter.consume(3); 155 } 156 157 // Measure wait interval after backoff reduction 158 testEdge.setValue(9500); 159 limiter.refill(10); 160 limiter.consume(10); 161 long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1); 162 163 // After 10 intervals without contention (decrement=0.05 each), 164 // multiplier should decrease by ~0.5, making wait interval lower 165 assertTrue("Wait interval should decrease without contention. Elevated: " + elevatedWaitInterval 166 + ", Reduced: " + reducedWaitInterval, reducedWaitInterval < elevatedWaitInterval * 0.9); 167 } 168 169 @Test 170 public void testOversubscriptionIncreasesWithLowUtilization() { 171 FeedbackAdaptiveRateLimiter limiter = factory.create(); 172 limiter.set(10, TimeUnit.SECONDS); 173 174 testEdge.setValue(1000); 175 176 // Initial refill to set up the limiter 177 long initialRefill = limiter.refill(10); 178 assertEquals("Initial refill should match limit", 10, initialRefill); 179 180 // Create low utilization scenario (consuming much less than available) 181 // With error budget of 0.1, min target utilization is 0.9 182 // We'll consume only ~40% to trigger oversubscription increase 183 // Refill interval adjusted limit is 5 (500ms / 1000ms * 10) 184 for (int i = 0; i < 30; i++) { 185 // Consume before advancing time so utilization is tracked 186 limiter.consume(2); // 2 out of 5 = 40% utilization 187 testEdge.setValue(1000 + (i + 1) * 500); 188 limiter.refill(10); 189 } 190 191 // After many intervals of low utilization, oversubscription should have increased 192 // Now test that the oversubscription proportion actually affects refill behavior 193 // Consume all available to start fresh 194 limiter.consume((int) limiter.getAvailable()); 195 196 // Jump forward by 3 refill intervals (1500ms) 197 // This tests that refill can return more than the base limit due to oversubscription 198 testEdge.setValue(16000 + 1500); 199 long multiIntervalRefill = limiter.refill(10); 200 201 // With oversubscription at max (0.2), the oversubscribed limit is 10 * 1.2 = 12 202 // With 3 intervals: refillAmount = 3 * 5 = 15 203 // Result = min(12, 15) = 12, which exceeds the base limit of 10 204 // Without oversubscription, this would be capped at min(10, 15) = 10 205 assertTrue("With oversubscription from low utilization, refill should exceed base limit. Got: " 206 + multiIntervalRefill, multiIntervalRefill > 10); 207 } 208 209 @Test 210 public void testOversubscriptionDecreasesWithHighUtilization() { 211 FeedbackAdaptiveRateLimiter limiter = factory.create(); 212 limiter.set(10, TimeUnit.SECONDS); 213 214 testEdge.setValue(1000); 215 216 // First, build up oversubscription with low utilization 217 limiter.refill(10); 218 for (int i = 0; i < 15; i++) { 219 testEdge.setValue(1000 + (i + 1) * 500); 220 limiter.refill(10); 221 limiter.consume(2); // Low utilization 222 } 223 224 // Now create high utilization scenario (consuming more than target) 225 // With error budget of 0.1, max target utilization is 1.1 226 // We'll consume close to the full interval-adjusted limit to trigger decrease 227 for (int i = 0; i < 10; i++) { 228 testEdge.setValue(8500 + (i + 1) * 500); 229 long refilled = limiter.refill(10); 230 // Consume full amount to show high utilization 231 limiter.consume((int) refilled); 232 } 233 234 // After intervals of high utilization, oversubscription should decrease 235 testEdge.setValue(14000); 236 long refillAfterHighUtil = limiter.refill(10); 237 238 // Oversubscription should have decreased, so refill should be closer to base limit 239 // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop by ~0.05 240 assertTrue( 241 "Refill should be closer to base after high utilization. Got: " + refillAfterHighUtil, 242 refillAfterHighUtil <= 6); 243 } 244 245 @Test 246 public void testBackoffMultiplierCapsAtMaximum() { 247 FeedbackAdaptiveRateLimiter limiter = factory.create(); 248 limiter.set(10, TimeUnit.SECONDS); 249 250 testEdge.setValue(1000); 251 limiter.refill(10); 252 253 // Record base wait interval 254 limiter.consume(10); 255 long baseWaitInterval = limiter.getWaitInterval(10, 0, 1); 256 257 // Create extreme sustained contention to push backoff to max 258 // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals 259 for (int i = 0; i < 25; i++) { 260 testEdge.setValue(1000 + (i + 1) * 500); 261 limiter.refill(10); 262 limiter.consume(10); 263 limiter.getWaitInterval(10, 0, 1); // Create contention 264 } 265 266 // Measure wait at maximum backoff 267 testEdge.setValue(14000); 268 limiter.refill(10); 269 limiter.consume(10); 270 long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1); 271 272 // Wait interval should be approximately 3x base (max multiplier) 273 assertTrue( 274 "Wait interval should cap at max multiplier. Base: " + baseWaitInterval + ", Max backoff: " 275 + maxBackoffWaitInterval, 276 maxBackoffWaitInterval >= baseWaitInterval * 2.5 277 && maxBackoffWaitInterval <= baseWaitInterval * 3.5); 278 279 // Additional contention should not increase wait further 280 testEdge.setValue(14500); 281 limiter.refill(10); 282 limiter.consume(10); 283 limiter.getWaitInterval(10, 0, 1); 284 285 testEdge.setValue(15000); 286 limiter.refill(10); 287 limiter.consume(10); 288 long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1); 289 290 // Should still be at max, not increasing further 291 assertTrue( 292 "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", Current: " 293 + stillMaxWaitInterval, 294 Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < baseWaitInterval * 0.2); 295 } 296 297 @Test 298 public void testOversubscriptionCapsAtMaximum() { 299 FeedbackAdaptiveRateLimiter limiter = factory.create(); 300 limiter.set(10, TimeUnit.SECONDS); 301 302 testEdge.setValue(1000); 303 limiter.refill(10); 304 305 // Create extreme low utilization to push oversubscription to max 306 // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals 307 for (int i = 0; i < 25; i++) { 308 testEdge.setValue(1000 + (i + 1) * 500); 309 limiter.refill(10); 310 // Very low consumption to maximize oversubscription increase 311 limiter.consume(1); 312 } 313 314 // Check that refill is capped at max oversubscription 315 testEdge.setValue(14000); 316 long refillWithMaxOversubscription = limiter.refill(10); 317 318 // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6 319 // (5 is the interval-adjusted limit for 500ms refill interval) 320 assertTrue("Refill should cap at max oversubscription. Got: " + refillWithMaxOversubscription, 321 refillWithMaxOversubscription <= 7); 322 323 // Further low utilization should not increase refill 324 testEdge.setValue(14500); 325 limiter.refill(10); 326 limiter.consume(1); 327 328 testEdge.setValue(15000); 329 long stillMaxRefill = limiter.refill(10); 330 331 // Should remain at cap 332 assertEquals("Refill should remain at max oversubscription", refillWithMaxOversubscription, 333 stillMaxRefill); 334 } 335 336 @Test 337 public void testMultipleRefillIntervals() { 338 FeedbackAdaptiveRateLimiter limiter = factory.create(); 339 limiter.set(10, TimeUnit.SECONDS); 340 341 testEdge.setValue(1000); 342 limiter.refill(10); 343 limiter.consume(10); 344 345 // Jump forward by multiple refill intervals (3 intervals = 1500ms) 346 testEdge.setValue(1000 + 1500); 347 348 // Should refill 3 intervals worth, but capped at oversubscribed limit 349 long multiIntervalRefill = limiter.refill(10); 350 351 // With 500ms refill interval, each interval gives 5 resources 352 // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10 353 assertTrue("Multiple interval refill should provide multiple refill amounts. Got: " 354 + multiIntervalRefill, multiIntervalRefill >= 10); 355 } 356 357 @Test 358 public void testRefillIntervalAdjustment() { 359 FeedbackAdaptiveRateLimiter limiter = factory.create(); 360 limiter.set(10, TimeUnit.SECONDS); 361 362 testEdge.setValue(1000); 363 364 // First refill should give full limit 365 long firstRefill = limiter.refill(10); 366 assertEquals("First refill should give full limit", 10, firstRefill); 367 368 limiter.consume(10); 369 370 // After exactly one refill interval (500ms), should get interval-adjusted amount 371 testEdge.setValue(1000 + 500); 372 long adjustedRefill = limiter.refill(10); 373 374 // 500ms is half of 1000ms time unit, so should get half the limit = 5 375 assertEquals("Refill after one interval should be interval-adjusted", 5, adjustedRefill); 376 } 377 378 @Test 379 public void testBackoffMultiplierBottomsAtOne() { 380 FeedbackAdaptiveRateLimiter limiter = factory.create(); 381 limiter.set(10, TimeUnit.SECONDS); 382 383 testEdge.setValue(1000); 384 limiter.refill(10); 385 386 // Record baseline wait with no backoff applied 387 limiter.consume(10); 388 long baselineWait = limiter.getWaitInterval(10, 0, 1); 389 390 // Run many intervals without contention to ensure multiplier stays at 1.0 391 for (int i = 0; i < 20; i++) { 392 testEdge.setValue(1000 + (i + 1) * 500); 393 limiter.refill(10); 394 limiter.consume(3); // No contention 395 } 396 397 // Wait interval should still be at baseline (multiplier = 1.0) 398 testEdge.setValue(11500); 399 limiter.refill(10); 400 limiter.consume(10); 401 long noContentionWait = limiter.getWaitInterval(10, 0, 1); 402 403 assertEquals("Wait interval should not go below baseline (multiplier=1.0)", baselineWait, 404 noContentionWait); 405 } 406 407 @Test 408 public void testConcurrentAccess() throws InterruptedException { 409 FeedbackAdaptiveRateLimiter limiter = factory.create(); 410 limiter.set(100, TimeUnit.SECONDS); 411 412 testEdge.setValue(1000); 413 limiter.refill(100); 414 415 // Simulate concurrent access 416 Thread[] threads = new Thread[10]; 417 for (int i = 0; i < threads.length; i++) { 418 threads[i] = new Thread(() -> { 419 for (int j = 0; j < 10; j++) { 420 limiter.consume(1); 421 limiter.getWaitInterval(100, 50, 1); 422 } 423 }); 424 } 425 426 for (Thread thread : threads) { 427 thread.start(); 428 } 429 430 for (Thread thread : threads) { 431 thread.join(); 432 } 433 434 // Should complete without exceptions - basic thread safety verification 435 assertTrue("Concurrent access should complete successfully", true); 436 } 437 438 @Test 439 public void testOverconsumptionBehavior() { 440 FeedbackAdaptiveRateLimiter limiter = factory.create(); 441 limiter.set(10, TimeUnit.SECONDS); 442 443 testEdge.setValue(1000); 444 limiter.refill(10); 445 446 // Over-consume significantly 447 limiter.consume(20); 448 449 // Should require waiting for multiple intervals (500ms refill interval) 450 long waitInterval = limiter.getWaitInterval(10, -10, 1); 451 assertTrue("Should require substantial wait after over-consumption", waitInterval >= 500); 452 } 453 454 @Test 455 public void testOscillatingLoadPattern() { 456 FeedbackAdaptiveRateLimiter limiter = factory.create(); 457 limiter.set(10, TimeUnit.SECONDS); 458 459 testEdge.setValue(1000); 460 limiter.refill(10); 461 462 // Oscillate between high contention and low contention 463 for (int cycle = 0; cycle < 3; cycle++) { 464 // High contention phase - increase backoff 465 for (int i = 0; i < 3; i++) { 466 testEdge.setValue(1000 + (cycle * 3000) + (i * 500)); 467 limiter.refill(10); 468 limiter.consume(10); 469 limiter.getWaitInterval(10, 0, 1); // Create contention 470 } 471 472 long highContentionWait = limiter.getWaitInterval(10, 0, 1); 473 474 // Low contention phase - decrease backoff 475 for (int i = 0; i < 3; i++) { 476 testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500)); 477 limiter.refill(10); 478 limiter.consume(3); // No contention 479 } 480 481 testEdge.setValue(1000 + (cycle * 3000) + 3000); 482 limiter.refill(10); 483 limiter.consume(10); 484 long lowContentionWait = limiter.getWaitInterval(10, 0, 1); 485 486 // After low contention phase, wait should be lower than after high contention 487 assertTrue( 488 "Wait should decrease after low contention phase in cycle " + cycle + ". High: " 489 + highContentionWait + ", Low: " + lowContentionWait, 490 lowContentionWait < highContentionWait); 491 } 492 } 493 494 @Test 495 public void testUtilizationEmaConvergence() { 496 FeedbackAdaptiveRateLimiter limiter = factory.create(); 497 limiter.set(10, TimeUnit.SECONDS); 498 499 testEdge.setValue(1000); 500 limiter.refill(10); 501 502 // Consistently consume at 80% utilization 503 for (int i = 0; i < 30; i++) { 504 testEdge.setValue(1000 + (i + 1) * 500); 505 limiter.refill(10); 506 limiter.consume(4); // 4 out of 5 interval-adjusted = 80% 507 } 508 509 // After many intervals, oversubscription should stabilize 510 // At 80% utilization (below 90% target), oversubscription should increase 511 testEdge.setValue(16500); 512 limiter.refill(10); 513 514 // Now switch to 100% utilization 515 for (int i = 0; i < 30; i++) { 516 testEdge.setValue(16500 + (i + 1) * 500); 517 long refilled = limiter.refill(10); 518 limiter.consume((int) refilled); // Consume everything 519 } 520 521 // At 100% utilization (within target range), oversubscription should stabilize 522 testEdge.setValue(32000); 523 limiter.refill(10); 524 525 // The EMA should have adjusted, and refills should be different 526 // (though exact values depend on EMA convergence rate) 527 assertTrue("Refill behavior should adapt to utilization patterns", true); 528 } 529 530 private static final class ManualEnvironmentEdge implements EnvironmentEdge { 531 private long currentTime = 1000; 532 533 public void setValue(long time) { 534 this.currentTime = time; 535 } 536 537 @Override 538 public long currentTime() { 539 return currentTime; 540 } 541 } 542}