001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.util.concurrent.TimeUnit; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.testclassification.RegionServerTests; 027import org.apache.hadoop.hbase.testclassification.SmallTests; 028import org.apache.hadoop.hbase.util.EnvironmentEdge; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.junit.jupiter.api.AfterEach; 031import org.junit.jupiter.api.BeforeEach; 032import org.junit.jupiter.api.Tag; 033import org.junit.jupiter.api.Test; 034 035/** 036 * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive backoff multipliers and 037 * over-subscription functionality. 038 */ 039@Tag(RegionServerTests.TAG) 040@Tag(SmallTests.TAG) 041public class TestFeedbackAdaptiveRateLimiter { 042 043 private ManualEnvironmentEdge testEdge; 044 private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory factory; 045 046 @BeforeEach 047 public void setUp() { 048 testEdge = new ManualEnvironmentEdge(); 049 EnvironmentEdgeManager.injectEdge(testEdge); 050 051 Configuration conf = HBaseConfiguration.create(); 052 // Set refill interval for testing 053 conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 500); 054 // Configure adaptive parameters for testing - using larger values than defaults for 055 // observability 056 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 0.1); 057 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT, 058 0.05); 059 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 3.0); 060 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 0.01); 061 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 0.005); 062 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 0.2); 063 conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 0.1); 064 065 factory = new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf); 066 } 067 068 @AfterEach 069 public void tearDown() { 070 EnvironmentEdgeManager.reset(); 071 } 072 073 @Test 074 public void testBasicFunctionality() { 075 FeedbackAdaptiveRateLimiter limiter = factory.create(); 076 limiter.set(10, TimeUnit.SECONDS); 077 078 // Initially should work like normal rate limiter 079 assertEquals(0, limiter.getWaitIntervalMs()); 080 limiter.consume(5); 081 assertEquals(0, limiter.getWaitIntervalMs()); 082 limiter.consume(5); 083 084 // Should need to wait after consuming full limit 085 assertTrue(limiter.getWaitIntervalMs() > 0); 086 } 087 088 @Test 089 public void testAdaptiveBackoffIncreases() { 090 FeedbackAdaptiveRateLimiter limiter = factory.create(); 091 limiter.set(10, TimeUnit.SECONDS); 092 093 testEdge.setValue(1000); 094 limiter.refill(10); 095 096 // Record initial wait interval 097 limiter.consume(10); 098 long initialWaitInterval = limiter.getWaitInterval(10, 0, 1); 099 assertTrue(initialWaitInterval > 0, "Initial wait interval should be positive"); 100 101 // Create sustained contention over multiple intervals to increase backoff 102 for (int i = 0; i < 5; i++) { 103 testEdge.setValue(1000 + (i + 1) * 500); 104 limiter.refill(10); 105 limiter.consume(10); 106 // Create contention by asking for more than available 107 limiter.getWaitInterval(10, 0, 1); 108 } 109 110 // After contention, wait interval should increase due to backoff multiplier 111 testEdge.setValue(4000); 112 limiter.refill(10); 113 limiter.consume(10); 114 long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1); 115 116 // With backoffMultiplierIncrement=0.1 and 5 intervals of contention, 117 // multiplier should be around 1.5, so wait should be significantly higher 118 assertTrue(increasedWaitInterval > initialWaitInterval * 1.3, 119 "Wait interval should increase with contention. Initial: " + initialWaitInterval 120 + ", After contention: " + increasedWaitInterval); 121 } 122 123 @Test 124 public void testAdaptiveBackoffDecreases() { 125 FeedbackAdaptiveRateLimiter limiter = factory.create(); 126 limiter.set(10, TimeUnit.SECONDS); 127 128 testEdge.setValue(1000); 129 limiter.refill(10); 130 131 // Build up contention to increase backoff multiplier 132 for (int i = 0; i < 5; i++) { 133 limiter.consume(10); 134 limiter.getWaitInterval(10, 0, 1); // Create contention 135 testEdge.setValue(1000 + (i + 1) * 500); 136 limiter.refill(10); 137 } 138 139 // Measure wait interval with elevated backoff 140 limiter.consume(10); 141 long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1); 142 143 // Run several intervals without contention to decrease backoff 144 for (int i = 0; i < 10; i++) { 145 testEdge.setValue(4000 + i * 500); 146 limiter.refill(10); 147 // Consume less than available - no contention 148 limiter.consume(3); 149 } 150 151 // Measure wait interval after backoff reduction 152 testEdge.setValue(9500); 153 limiter.refill(10); 154 limiter.consume(10); 155 long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1); 156 157 // After 10 intervals without contention (decrement=0.05 each), 158 // multiplier should decrease by ~0.5, making wait interval lower 159 assertTrue(reducedWaitInterval < elevatedWaitInterval * 0.9, 160 "Wait interval should decrease without contention. Elevated: " + elevatedWaitInterval 161 + ", Reduced: " + reducedWaitInterval); 162 } 163 164 @Test 165 public void testOversubscriptionIncreasesWithLowUtilization() { 166 FeedbackAdaptiveRateLimiter limiter = factory.create(); 167 limiter.set(10, TimeUnit.SECONDS); 168 169 testEdge.setValue(1000); 170 171 // Initial refill to set up the limiter 172 long initialRefill = limiter.refill(10); 173 assertEquals(10, initialRefill, "Initial refill should match limit"); 174 175 // Create low utilization scenario (consuming much less than available) 176 // With error budget of 0.1, min target utilization is 0.9 177 // We'll consume only ~40% to trigger oversubscription increase 178 // Refill interval adjusted limit is 5 (500ms / 1000ms * 10) 179 for (int i = 0; i < 30; i++) { 180 // Consume before advancing time so utilization is tracked 181 limiter.consume(2); // 2 out of 5 = 40% utilization 182 testEdge.setValue(1000 + (i + 1) * 500); 183 limiter.refill(10); 184 } 185 186 // After many intervals of low utilization, oversubscription should have increased 187 // Now test that the oversubscription proportion actually affects refill behavior 188 // Consume all available to start fresh 189 limiter.consume((int) limiter.getAvailable()); 190 191 // Jump forward by 3 refill intervals (1500ms) 192 // This tests that refill can return more than the base limit due to oversubscription 193 testEdge.setValue(16000 + 1500); 194 long multiIntervalRefill = limiter.refill(10); 195 196 // With oversubscription at max (0.2), the oversubscribed limit is 10 * 1.2 = 12 197 // With 3 intervals: refillAmount = 3 * 5 = 15 198 // Result = min(12, 15) = 12, which exceeds the base limit of 10 199 // Without oversubscription, this would be capped at min(10, 15) = 10 200 assertTrue(multiIntervalRefill > 10, 201 "With oversubscription from low utilization, refill should exceed base limit. Got: " 202 + multiIntervalRefill); 203 } 204 205 @Test 206 public void testOversubscriptionDecreasesWithHighUtilization() { 207 FeedbackAdaptiveRateLimiter limiter = factory.create(); 208 limiter.set(10, TimeUnit.SECONDS); 209 210 testEdge.setValue(1000); 211 212 // First, build up oversubscription with low utilization 213 limiter.refill(10); 214 for (int i = 0; i < 15; i++) { 215 testEdge.setValue(1000 + (i + 1) * 500); 216 limiter.refill(10); 217 limiter.consume(2); // Low utilization 218 } 219 220 // Now create high utilization scenario (consuming more than target) 221 // With error budget of 0.1, max target utilization is 1.1 222 // We'll consume close to the full interval-adjusted limit to trigger decrease 223 for (int i = 0; i < 10; i++) { 224 testEdge.setValue(8500 + (i + 1) * 500); 225 long refilled = limiter.refill(10); 226 // Consume full amount to show high utilization 227 limiter.consume((int) refilled); 228 } 229 230 // After intervals of high utilization, oversubscription should decrease 231 testEdge.setValue(14000); 232 long refillAfterHighUtil = limiter.refill(10); 233 234 // Oversubscription should have decreased, so refill should be closer to base limit 235 // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop by ~0.05 236 assertTrue(refillAfterHighUtil <= 6, 237 "Refill should be closer to base after high utilization. Got: " + refillAfterHighUtil); 238 } 239 240 @Test 241 public void testBackoffMultiplierCapsAtMaximum() { 242 FeedbackAdaptiveRateLimiter limiter = factory.create(); 243 limiter.set(10, TimeUnit.SECONDS); 244 245 testEdge.setValue(1000); 246 limiter.refill(10); 247 248 // Record base wait interval 249 limiter.consume(10); 250 long baseWaitInterval = limiter.getWaitInterval(10, 0, 1); 251 252 // Create extreme sustained contention to push backoff to max 253 // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals 254 for (int i = 0; i < 25; i++) { 255 testEdge.setValue(1000 + (i + 1) * 500); 256 limiter.refill(10); 257 limiter.consume(10); 258 limiter.getWaitInterval(10, 0, 1); // Create contention 259 } 260 261 // Measure wait at maximum backoff 262 testEdge.setValue(14000); 263 limiter.refill(10); 264 limiter.consume(10); 265 long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1); 266 267 // Wait interval should be approximately 3x base (max multiplier) 268 assertTrue( 269 maxBackoffWaitInterval >= baseWaitInterval * 2.5 270 && maxBackoffWaitInterval <= baseWaitInterval * 3.5, 271 "Wait interval should cap at max multiplier. Base: " + baseWaitInterval + ", Max backoff: " 272 + maxBackoffWaitInterval); 273 274 // Additional contention should not increase wait further 275 testEdge.setValue(14500); 276 limiter.refill(10); 277 limiter.consume(10); 278 limiter.getWaitInterval(10, 0, 1); 279 280 testEdge.setValue(15000); 281 limiter.refill(10); 282 limiter.consume(10); 283 long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1); 284 285 // Should still be at max, not increasing further 286 assertTrue(Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < baseWaitInterval * 0.2, 287 "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", Current: " 288 + stillMaxWaitInterval); 289 } 290 291 @Test 292 public void testOversubscriptionCapsAtMaximum() { 293 FeedbackAdaptiveRateLimiter limiter = factory.create(); 294 limiter.set(10, TimeUnit.SECONDS); 295 296 testEdge.setValue(1000); 297 limiter.refill(10); 298 299 // Create extreme low utilization to push oversubscription to max 300 // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals 301 for (int i = 0; i < 25; i++) { 302 testEdge.setValue(1000 + (i + 1) * 500); 303 limiter.refill(10); 304 // Very low consumption to maximize oversubscription increase 305 limiter.consume(1); 306 } 307 308 // Check that refill is capped at max oversubscription 309 testEdge.setValue(14000); 310 long refillWithMaxOversubscription = limiter.refill(10); 311 312 // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6 313 // (5 is the interval-adjusted limit for 500ms refill interval) 314 assertTrue(refillWithMaxOversubscription <= 7, 315 "Refill should cap at max oversubscription. Got: " + refillWithMaxOversubscription); 316 317 // Further low utilization should not increase refill 318 testEdge.setValue(14500); 319 limiter.refill(10); 320 limiter.consume(1); 321 322 testEdge.setValue(15000); 323 long stillMaxRefill = limiter.refill(10); 324 325 // Should remain at cap 326 assertEquals(refillWithMaxOversubscription, stillMaxRefill, 327 "Refill should remain at max oversubscription"); 328 } 329 330 @Test 331 public void testMultipleRefillIntervals() { 332 FeedbackAdaptiveRateLimiter limiter = factory.create(); 333 limiter.set(10, TimeUnit.SECONDS); 334 335 testEdge.setValue(1000); 336 limiter.refill(10); 337 limiter.consume(10); 338 339 // Jump forward by multiple refill intervals (3 intervals = 1500ms) 340 testEdge.setValue(1000 + 1500); 341 342 // Should refill 3 intervals worth, but capped at oversubscribed limit 343 long multiIntervalRefill = limiter.refill(10); 344 345 // With 500ms refill interval, each interval gives 5 resources 346 // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10 347 assertTrue(multiIntervalRefill >= 10, 348 "Multiple interval refill should provide multiple refill amounts. Got: " 349 + multiIntervalRefill); 350 } 351 352 @Test 353 public void testRefillIntervalAdjustment() { 354 FeedbackAdaptiveRateLimiter limiter = factory.create(); 355 limiter.set(10, TimeUnit.SECONDS); 356 357 testEdge.setValue(1000); 358 359 // First refill should give full limit 360 long firstRefill = limiter.refill(10); 361 assertEquals(10, firstRefill, "First refill should give full limit"); 362 363 limiter.consume(10); 364 365 // After exactly one refill interval (500ms), should get interval-adjusted amount 366 testEdge.setValue(1000 + 500); 367 long adjustedRefill = limiter.refill(10); 368 369 // 500ms is half of 1000ms time unit, so should get half the limit = 5 370 assertEquals(5, adjustedRefill, "Refill after one interval should be interval-adjusted"); 371 } 372 373 @Test 374 public void testBackoffMultiplierBottomsAtOne() { 375 FeedbackAdaptiveRateLimiter limiter = factory.create(); 376 limiter.set(10, TimeUnit.SECONDS); 377 378 testEdge.setValue(1000); 379 limiter.refill(10); 380 381 // Record baseline wait with no backoff applied 382 limiter.consume(10); 383 long baselineWait = limiter.getWaitInterval(10, 0, 1); 384 385 // Run many intervals without contention to ensure multiplier stays at 1.0 386 for (int i = 0; i < 20; i++) { 387 testEdge.setValue(1000 + (i + 1) * 500); 388 limiter.refill(10); 389 limiter.consume(3); // No contention 390 } 391 392 // Wait interval should still be at baseline (multiplier = 1.0) 393 testEdge.setValue(11500); 394 limiter.refill(10); 395 limiter.consume(10); 396 long noContentionWait = limiter.getWaitInterval(10, 0, 1); 397 398 assertEquals(baselineWait, noContentionWait, 399 "Wait interval should not go below baseline (multiplier=1.0)"); 400 } 401 402 @Test 403 public void testConcurrentAccess() throws InterruptedException { 404 FeedbackAdaptiveRateLimiter limiter = factory.create(); 405 limiter.set(100, TimeUnit.SECONDS); 406 407 testEdge.setValue(1000); 408 limiter.refill(100); 409 410 // Simulate concurrent access 411 Thread[] threads = new Thread[10]; 412 for (int i = 0; i < threads.length; i++) { 413 threads[i] = new Thread(() -> { 414 for (int j = 0; j < 10; j++) { 415 limiter.consume(1); 416 limiter.getWaitInterval(100, 50, 1); 417 } 418 }); 419 } 420 421 for (Thread thread : threads) { 422 thread.start(); 423 } 424 425 for (Thread thread : threads) { 426 thread.join(); 427 } 428 429 // Should complete without exceptions - basic thread safety verification 430 assertTrue(true, "Concurrent access should complete successfully"); 431 } 432 433 @Test 434 public void testOverconsumptionBehavior() { 435 FeedbackAdaptiveRateLimiter limiter = factory.create(); 436 limiter.set(10, TimeUnit.SECONDS); 437 438 testEdge.setValue(1000); 439 limiter.refill(10); 440 441 // Over-consume significantly 442 limiter.consume(20); 443 444 // Should require waiting for multiple intervals (500ms refill interval) 445 long waitInterval = limiter.getWaitInterval(10, -10, 1); 446 assertTrue(waitInterval >= 500, "Should require substantial wait after over-consumption"); 447 } 448 449 @Test 450 public void testOscillatingLoadPattern() { 451 FeedbackAdaptiveRateLimiter limiter = factory.create(); 452 limiter.set(10, TimeUnit.SECONDS); 453 454 testEdge.setValue(1000); 455 limiter.refill(10); 456 457 // Oscillate between high contention and low contention 458 for (int cycle = 0; cycle < 3; cycle++) { 459 // High contention phase - increase backoff 460 for (int i = 0; i < 3; i++) { 461 testEdge.setValue(1000 + (cycle * 3000) + (i * 500)); 462 limiter.refill(10); 463 limiter.consume(10); 464 limiter.getWaitInterval(10, 0, 1); // Create contention 465 } 466 467 long highContentionWait = limiter.getWaitInterval(10, 0, 1); 468 469 // Low contention phase - decrease backoff 470 for (int i = 0; i < 3; i++) { 471 testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500)); 472 limiter.refill(10); 473 limiter.consume(3); // No contention 474 } 475 476 testEdge.setValue(1000 + (cycle * 3000) + 3000); 477 limiter.refill(10); 478 limiter.consume(10); 479 long lowContentionWait = limiter.getWaitInterval(10, 0, 1); 480 481 // After low contention phase, wait should be lower than after high contention 482 assertTrue(lowContentionWait < highContentionWait, 483 "Wait should decrease after low contention phase in cycle " + cycle + ". High: " 484 + highContentionWait + ", Low: " + lowContentionWait); 485 } 486 } 487 488 @Test 489 public void testUtilizationEmaConvergence() { 490 FeedbackAdaptiveRateLimiter limiter = factory.create(); 491 limiter.set(10, TimeUnit.SECONDS); 492 493 testEdge.setValue(1000); 494 limiter.refill(10); 495 496 // Consistently consume at 80% utilization 497 for (int i = 0; i < 30; i++) { 498 testEdge.setValue(1000 + (i + 1) * 500); 499 limiter.refill(10); 500 limiter.consume(4); // 4 out of 5 interval-adjusted = 80% 501 } 502 503 // After many intervals, oversubscription should stabilize 504 // At 80% utilization (below 90% target), oversubscription should increase 505 testEdge.setValue(16500); 506 limiter.refill(10); 507 508 // Now switch to 100% utilization 509 for (int i = 0; i < 30; i++) { 510 testEdge.setValue(16500 + (i + 1) * 500); 511 long refilled = limiter.refill(10); 512 limiter.consume((int) refilled); // Consume everything 513 } 514 515 // At 100% utilization (within target range), oversubscription should stabilize 516 testEdge.setValue(32000); 517 limiter.refill(10); 518 519 // The EMA should have adjusted, and refills should be different 520 // (though exact values depend on EMA convergence rate) 521 assertTrue(true, "Refill behavior should adapt to utilization patterns"); 522 } 523 524 private static final class ManualEnvironmentEdge implements EnvironmentEdge { 525 private long currentTime = 1000; 526 527 public void setValue(long time) { 528 this.currentTime = time; 529 } 530 531 @Override 532 public long currentTime() { 533 return currentTime; 534 } 535 } 536}