001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.testclassification.RegionServerTests;
028import org.apache.hadoop.hbase.testclassification.SmallTests;
029import org.apache.hadoop.hbase.util.EnvironmentEdge;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.junit.After;
032import org.junit.Before;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036
037/**
038 * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive backoff multipliers and
039 * over-subscription functionality.
040 */
041@Category({ RegionServerTests.class, SmallTests.class })
042public class TestFeedbackAdaptiveRateLimiter {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestFeedbackAdaptiveRateLimiter.class);
047
048  private ManualEnvironmentEdge testEdge;
049  private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory factory;
050
051  @Before
052  public void setUp() {
053    testEdge = new ManualEnvironmentEdge();
054    EnvironmentEdgeManager.injectEdge(testEdge);
055
056    Configuration conf = HBaseConfiguration.create();
057    // Set refill interval for testing
058    conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 500);
059    // Configure adaptive parameters for testing - using larger values than defaults for
060    // observability
061    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 0.1);
062    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
063      0.05);
064    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 3.0);
065    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 0.01);
066    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 0.005);
067    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 0.2);
068    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 0.1);
069
070    factory = new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
071  }
072
073  @After
074  public void tearDown() {
075    EnvironmentEdgeManager.reset();
076  }
077
078  @Test
079  public void testBasicFunctionality() {
080    FeedbackAdaptiveRateLimiter limiter = factory.create();
081    limiter.set(10, TimeUnit.SECONDS);
082
083    // Initially should work like normal rate limiter
084    assertEquals(0, limiter.getWaitIntervalMs());
085    limiter.consume(5);
086    assertEquals(0, limiter.getWaitIntervalMs());
087    limiter.consume(5);
088
089    // Should need to wait after consuming full limit
090    assertTrue(limiter.getWaitIntervalMs() > 0);
091  }
092
093  @Test
094  public void testAdaptiveBackoffIncreases() {
095    FeedbackAdaptiveRateLimiter limiter = factory.create();
096    limiter.set(10, TimeUnit.SECONDS);
097
098    testEdge.setValue(1000);
099    limiter.refill(10);
100
101    // Record initial wait interval
102    limiter.consume(10);
103    long initialWaitInterval = limiter.getWaitInterval(10, 0, 1);
104    assertTrue("Initial wait interval should be positive", initialWaitInterval > 0);
105
106    // Create sustained contention over multiple intervals to increase backoff
107    for (int i = 0; i < 5; i++) {
108      testEdge.setValue(1000 + (i + 1) * 500);
109      limiter.refill(10);
110      limiter.consume(10);
111      // Create contention by asking for more than available
112      limiter.getWaitInterval(10, 0, 1);
113    }
114
115    // After contention, wait interval should increase due to backoff multiplier
116    testEdge.setValue(4000);
117    limiter.refill(10);
118    limiter.consume(10);
119    long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1);
120
121    // With backoffMultiplierIncrement=0.1 and 5 intervals of contention,
122    // multiplier should be around 1.5, so wait should be significantly higher
123    assertTrue(
124      "Wait interval should increase with contention. Initial: " + initialWaitInterval
125        + ", After contention: " + increasedWaitInterval,
126      increasedWaitInterval > initialWaitInterval * 1.3);
127  }
128
129  @Test
130  public void testAdaptiveBackoffDecreases() {
131    FeedbackAdaptiveRateLimiter limiter = factory.create();
132    limiter.set(10, TimeUnit.SECONDS);
133
134    testEdge.setValue(1000);
135    limiter.refill(10);
136
137    // Build up contention to increase backoff multiplier
138    for (int i = 0; i < 5; i++) {
139      limiter.consume(10);
140      limiter.getWaitInterval(10, 0, 1); // Create contention
141      testEdge.setValue(1000 + (i + 1) * 500);
142      limiter.refill(10);
143    }
144
145    // Measure wait interval with elevated backoff
146    limiter.consume(10);
147    long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1);
148
149    // Run several intervals without contention to decrease backoff
150    for (int i = 0; i < 10; i++) {
151      testEdge.setValue(4000 + i * 500);
152      limiter.refill(10);
153      // Consume less than available - no contention
154      limiter.consume(3);
155    }
156
157    // Measure wait interval after backoff reduction
158    testEdge.setValue(9500);
159    limiter.refill(10);
160    limiter.consume(10);
161    long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1);
162
163    // After 10 intervals without contention (decrement=0.05 each),
164    // multiplier should decrease by ~0.5, making wait interval lower
165    assertTrue("Wait interval should decrease without contention. Elevated: " + elevatedWaitInterval
166      + ", Reduced: " + reducedWaitInterval, reducedWaitInterval < elevatedWaitInterval * 0.9);
167  }
168
169  @Test
170  public void testOversubscriptionIncreasesWithLowUtilization() {
171    FeedbackAdaptiveRateLimiter limiter = factory.create();
172    limiter.set(10, TimeUnit.SECONDS);
173
174    testEdge.setValue(1000);
175
176    // Initial refill to set up the limiter
177    long initialRefill = limiter.refill(10);
178    assertEquals("Initial refill should match limit", 10, initialRefill);
179
180    // Create low utilization scenario (consuming much less than available)
181    // With error budget of 0.1, min target utilization is 0.9
182    // We'll consume only ~40% to trigger oversubscription increase
183    // Refill interval adjusted limit is 5 (500ms / 1000ms * 10)
184    for (int i = 0; i < 30; i++) {
185      // Consume before advancing time so utilization is tracked
186      limiter.consume(2); // 2 out of 5 = 40% utilization
187      testEdge.setValue(1000 + (i + 1) * 500);
188      limiter.refill(10);
189    }
190
191    // After many intervals of low utilization, oversubscription should have increased
192    // Now test that the oversubscription proportion actually affects refill behavior
193    // Consume all available to start fresh
194    limiter.consume((int) limiter.getAvailable());
195
196    // Jump forward by 3 refill intervals (1500ms)
197    // This tests that refill can return more than the base limit due to oversubscription
198    testEdge.setValue(16000 + 1500);
199    long multiIntervalRefill = limiter.refill(10);
200
201    // With oversubscription at max (0.2), the oversubscribed limit is 10 * 1.2 = 12
202    // With 3 intervals: refillAmount = 3 * 5 = 15
203    // Result = min(12, 15) = 12, which exceeds the base limit of 10
204    // Without oversubscription, this would be capped at min(10, 15) = 10
205    assertTrue("With oversubscription from low utilization, refill should exceed base limit. Got: "
206      + multiIntervalRefill, multiIntervalRefill > 10);
207  }
208
209  @Test
210  public void testOversubscriptionDecreasesWithHighUtilization() {
211    FeedbackAdaptiveRateLimiter limiter = factory.create();
212    limiter.set(10, TimeUnit.SECONDS);
213
214    testEdge.setValue(1000);
215
216    // First, build up oversubscription with low utilization
217    limiter.refill(10);
218    for (int i = 0; i < 15; i++) {
219      testEdge.setValue(1000 + (i + 1) * 500);
220      limiter.refill(10);
221      limiter.consume(2); // Low utilization
222    }
223
224    // Now create high utilization scenario (consuming more than target)
225    // With error budget of 0.1, max target utilization is 1.1
226    // We'll consume close to the full interval-adjusted limit to trigger decrease
227    for (int i = 0; i < 10; i++) {
228      testEdge.setValue(8500 + (i + 1) * 500);
229      long refilled = limiter.refill(10);
230      // Consume full amount to show high utilization
231      limiter.consume((int) refilled);
232    }
233
234    // After intervals of high utilization, oversubscription should decrease
235    testEdge.setValue(14000);
236    long refillAfterHighUtil = limiter.refill(10);
237
238    // Oversubscription should have decreased, so refill should be closer to base limit
239    // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop by ~0.05
240    assertTrue(
241      "Refill should be closer to base after high utilization. Got: " + refillAfterHighUtil,
242      refillAfterHighUtil <= 6);
243  }
244
245  @Test
246  public void testBackoffMultiplierCapsAtMaximum() {
247    FeedbackAdaptiveRateLimiter limiter = factory.create();
248    limiter.set(10, TimeUnit.SECONDS);
249
250    testEdge.setValue(1000);
251    limiter.refill(10);
252
253    // Record base wait interval
254    limiter.consume(10);
255    long baseWaitInterval = limiter.getWaitInterval(10, 0, 1);
256
257    // Create extreme sustained contention to push backoff to max
258    // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals
259    for (int i = 0; i < 25; i++) {
260      testEdge.setValue(1000 + (i + 1) * 500);
261      limiter.refill(10);
262      limiter.consume(10);
263      limiter.getWaitInterval(10, 0, 1); // Create contention
264    }
265
266    // Measure wait at maximum backoff
267    testEdge.setValue(14000);
268    limiter.refill(10);
269    limiter.consume(10);
270    long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1);
271
272    // Wait interval should be approximately 3x base (max multiplier)
273    assertTrue(
274      "Wait interval should cap at max multiplier. Base: " + baseWaitInterval + ", Max backoff: "
275        + maxBackoffWaitInterval,
276      maxBackoffWaitInterval >= baseWaitInterval * 2.5
277        && maxBackoffWaitInterval <= baseWaitInterval * 3.5);
278
279    // Additional contention should not increase wait further
280    testEdge.setValue(14500);
281    limiter.refill(10);
282    limiter.consume(10);
283    limiter.getWaitInterval(10, 0, 1);
284
285    testEdge.setValue(15000);
286    limiter.refill(10);
287    limiter.consume(10);
288    long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1);
289
290    // Should still be at max, not increasing further
291    assertTrue(
292      "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", Current: "
293        + stillMaxWaitInterval,
294      Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < baseWaitInterval * 0.2);
295  }
296
297  @Test
298  public void testOversubscriptionCapsAtMaximum() {
299    FeedbackAdaptiveRateLimiter limiter = factory.create();
300    limiter.set(10, TimeUnit.SECONDS);
301
302    testEdge.setValue(1000);
303    limiter.refill(10);
304
305    // Create extreme low utilization to push oversubscription to max
306    // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals
307    for (int i = 0; i < 25; i++) {
308      testEdge.setValue(1000 + (i + 1) * 500);
309      limiter.refill(10);
310      // Very low consumption to maximize oversubscription increase
311      limiter.consume(1);
312    }
313
314    // Check that refill is capped at max oversubscription
315    testEdge.setValue(14000);
316    long refillWithMaxOversubscription = limiter.refill(10);
317
318    // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6
319    // (5 is the interval-adjusted limit for 500ms refill interval)
320    assertTrue("Refill should cap at max oversubscription. Got: " + refillWithMaxOversubscription,
321      refillWithMaxOversubscription <= 7);
322
323    // Further low utilization should not increase refill
324    testEdge.setValue(14500);
325    limiter.refill(10);
326    limiter.consume(1);
327
328    testEdge.setValue(15000);
329    long stillMaxRefill = limiter.refill(10);
330
331    // Should remain at cap
332    assertEquals("Refill should remain at max oversubscription", refillWithMaxOversubscription,
333      stillMaxRefill);
334  }
335
336  @Test
337  public void testMultipleRefillIntervals() {
338    FeedbackAdaptiveRateLimiter limiter = factory.create();
339    limiter.set(10, TimeUnit.SECONDS);
340
341    testEdge.setValue(1000);
342    limiter.refill(10);
343    limiter.consume(10);
344
345    // Jump forward by multiple refill intervals (3 intervals = 1500ms)
346    testEdge.setValue(1000 + 1500);
347
348    // Should refill 3 intervals worth, but capped at oversubscribed limit
349    long multiIntervalRefill = limiter.refill(10);
350
351    // With 500ms refill interval, each interval gives 5 resources
352    // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10
353    assertTrue("Multiple interval refill should provide multiple refill amounts. Got: "
354      + multiIntervalRefill, multiIntervalRefill >= 10);
355  }
356
357  @Test
358  public void testRefillIntervalAdjustment() {
359    FeedbackAdaptiveRateLimiter limiter = factory.create();
360    limiter.set(10, TimeUnit.SECONDS);
361
362    testEdge.setValue(1000);
363
364    // First refill should give full limit
365    long firstRefill = limiter.refill(10);
366    assertEquals("First refill should give full limit", 10, firstRefill);
367
368    limiter.consume(10);
369
370    // After exactly one refill interval (500ms), should get interval-adjusted amount
371    testEdge.setValue(1000 + 500);
372    long adjustedRefill = limiter.refill(10);
373
374    // 500ms is half of 1000ms time unit, so should get half the limit = 5
375    assertEquals("Refill after one interval should be interval-adjusted", 5, adjustedRefill);
376  }
377
378  @Test
379  public void testBackoffMultiplierBottomsAtOne() {
380    FeedbackAdaptiveRateLimiter limiter = factory.create();
381    limiter.set(10, TimeUnit.SECONDS);
382
383    testEdge.setValue(1000);
384    limiter.refill(10);
385
386    // Record baseline wait with no backoff applied
387    limiter.consume(10);
388    long baselineWait = limiter.getWaitInterval(10, 0, 1);
389
390    // Run many intervals without contention to ensure multiplier stays at 1.0
391    for (int i = 0; i < 20; i++) {
392      testEdge.setValue(1000 + (i + 1) * 500);
393      limiter.refill(10);
394      limiter.consume(3); // No contention
395    }
396
397    // Wait interval should still be at baseline (multiplier = 1.0)
398    testEdge.setValue(11500);
399    limiter.refill(10);
400    limiter.consume(10);
401    long noContentionWait = limiter.getWaitInterval(10, 0, 1);
402
403    assertEquals("Wait interval should not go below baseline (multiplier=1.0)", baselineWait,
404      noContentionWait);
405  }
406
407  @Test
408  public void testConcurrentAccess() throws InterruptedException {
409    FeedbackAdaptiveRateLimiter limiter = factory.create();
410    limiter.set(100, TimeUnit.SECONDS);
411
412    testEdge.setValue(1000);
413    limiter.refill(100);
414
415    // Simulate concurrent access
416    Thread[] threads = new Thread[10];
417    for (int i = 0; i < threads.length; i++) {
418      threads[i] = new Thread(() -> {
419        for (int j = 0; j < 10; j++) {
420          limiter.consume(1);
421          limiter.getWaitInterval(100, 50, 1);
422        }
423      });
424    }
425
426    for (Thread thread : threads) {
427      thread.start();
428    }
429
430    for (Thread thread : threads) {
431      thread.join();
432    }
433
434    // Should complete without exceptions - basic thread safety verification
435    assertTrue("Concurrent access should complete successfully", true);
436  }
437
438  @Test
439  public void testOverconsumptionBehavior() {
440    FeedbackAdaptiveRateLimiter limiter = factory.create();
441    limiter.set(10, TimeUnit.SECONDS);
442
443    testEdge.setValue(1000);
444    limiter.refill(10);
445
446    // Over-consume significantly
447    limiter.consume(20);
448
449    // Should require waiting for multiple intervals (500ms refill interval)
450    long waitInterval = limiter.getWaitInterval(10, -10, 1);
451    assertTrue("Should require substantial wait after over-consumption", waitInterval >= 500);
452  }
453
454  @Test
455  public void testOscillatingLoadPattern() {
456    FeedbackAdaptiveRateLimiter limiter = factory.create();
457    limiter.set(10, TimeUnit.SECONDS);
458
459    testEdge.setValue(1000);
460    limiter.refill(10);
461
462    // Oscillate between high contention and low contention
463    for (int cycle = 0; cycle < 3; cycle++) {
464      // High contention phase - increase backoff
465      for (int i = 0; i < 3; i++) {
466        testEdge.setValue(1000 + (cycle * 3000) + (i * 500));
467        limiter.refill(10);
468        limiter.consume(10);
469        limiter.getWaitInterval(10, 0, 1); // Create contention
470      }
471
472      long highContentionWait = limiter.getWaitInterval(10, 0, 1);
473
474      // Low contention phase - decrease backoff
475      for (int i = 0; i < 3; i++) {
476        testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500));
477        limiter.refill(10);
478        limiter.consume(3); // No contention
479      }
480
481      testEdge.setValue(1000 + (cycle * 3000) + 3000);
482      limiter.refill(10);
483      limiter.consume(10);
484      long lowContentionWait = limiter.getWaitInterval(10, 0, 1);
485
486      // After low contention phase, wait should be lower than after high contention
487      assertTrue(
488        "Wait should decrease after low contention phase in cycle " + cycle + ". High: "
489          + highContentionWait + ", Low: " + lowContentionWait,
490        lowContentionWait < highContentionWait);
491    }
492  }
493
494  @Test
495  public void testUtilizationEmaConvergence() {
496    FeedbackAdaptiveRateLimiter limiter = factory.create();
497    limiter.set(10, TimeUnit.SECONDS);
498
499    testEdge.setValue(1000);
500    limiter.refill(10);
501
502    // Consistently consume at 80% utilization
503    for (int i = 0; i < 30; i++) {
504      testEdge.setValue(1000 + (i + 1) * 500);
505      limiter.refill(10);
506      limiter.consume(4); // 4 out of 5 interval-adjusted = 80%
507    }
508
509    // After many intervals, oversubscription should stabilize
510    // At 80% utilization (below 90% target), oversubscription should increase
511    testEdge.setValue(16500);
512    limiter.refill(10);
513
514    // Now switch to 100% utilization
515    for (int i = 0; i < 30; i++) {
516      testEdge.setValue(16500 + (i + 1) * 500);
517      long refilled = limiter.refill(10);
518      limiter.consume((int) refilled); // Consume everything
519    }
520
521    // At 100% utilization (within target range), oversubscription should stabilize
522    testEdge.setValue(32000);
523    limiter.refill(10);
524
525    // The EMA should have adjusted, and refills should be different
526    // (though exact values depend on EMA convergence rate)
527    assertTrue("Refill behavior should adapt to utilization patterns", true);
528  }
529
530  private static final class ManualEnvironmentEdge implements EnvironmentEdge {
531    private long currentTime = 1000;
532
533    public void setValue(long time) {
534      this.currentTime = time;
535    }
536
537    @Override
538    public long currentTime() {
539      return currentTime;
540    }
541  }
542}