001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.testclassification.RegionServerTests;
027import org.apache.hadoop.hbase.testclassification.SmallTests;
028import org.apache.hadoop.hbase.util.EnvironmentEdge;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.junit.jupiter.api.AfterEach;
031import org.junit.jupiter.api.BeforeEach;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035/**
036 * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive backoff multipliers and
037 * over-subscription functionality.
038 */
039@Tag(RegionServerTests.TAG)
040@Tag(SmallTests.TAG)
041public class TestFeedbackAdaptiveRateLimiter {
042
043  private ManualEnvironmentEdge testEdge;
044  private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory factory;
045
046  @BeforeEach
047  public void setUp() {
048    testEdge = new ManualEnvironmentEdge();
049    EnvironmentEdgeManager.injectEdge(testEdge);
050
051    Configuration conf = HBaseConfiguration.create();
052    // Set refill interval for testing
053    conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 500);
054    // Configure adaptive parameters for testing - using larger values than defaults for
055    // observability
056    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 0.1);
057    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
058      0.05);
059    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 3.0);
060    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 0.01);
061    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 0.005);
062    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 0.2);
063    conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 0.1);
064
065    factory = new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
066  }
067
068  @AfterEach
069  public void tearDown() {
070    EnvironmentEdgeManager.reset();
071  }
072
073  @Test
074  public void testBasicFunctionality() {
075    FeedbackAdaptiveRateLimiter limiter = factory.create();
076    limiter.set(10, TimeUnit.SECONDS);
077
078    // Initially should work like normal rate limiter
079    assertEquals(0, limiter.getWaitIntervalMs());
080    limiter.consume(5);
081    assertEquals(0, limiter.getWaitIntervalMs());
082    limiter.consume(5);
083
084    // Should need to wait after consuming full limit
085    assertTrue(limiter.getWaitIntervalMs() > 0);
086  }
087
088  @Test
089  public void testAdaptiveBackoffIncreases() {
090    FeedbackAdaptiveRateLimiter limiter = factory.create();
091    limiter.set(10, TimeUnit.SECONDS);
092
093    testEdge.setValue(1000);
094    limiter.refill(10);
095
096    // Record initial wait interval
097    limiter.consume(10);
098    long initialWaitInterval = limiter.getWaitInterval(10, 0, 1);
099    assertTrue(initialWaitInterval > 0, "Initial wait interval should be positive");
100
101    // Create sustained contention over multiple intervals to increase backoff
102    for (int i = 0; i < 5; i++) {
103      testEdge.setValue(1000 + (i + 1) * 500);
104      limiter.refill(10);
105      limiter.consume(10);
106      // Create contention by asking for more than available
107      limiter.getWaitInterval(10, 0, 1);
108    }
109
110    // After contention, wait interval should increase due to backoff multiplier
111    testEdge.setValue(4000);
112    limiter.refill(10);
113    limiter.consume(10);
114    long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1);
115
116    // With backoffMultiplierIncrement=0.1 and 5 intervals of contention,
117    // multiplier should be around 1.5, so wait should be significantly higher
118    assertTrue(increasedWaitInterval > initialWaitInterval * 1.3,
119      "Wait interval should increase with contention. Initial: " + initialWaitInterval
120        + ", After contention: " + increasedWaitInterval);
121  }
122
123  @Test
124  public void testAdaptiveBackoffDecreases() {
125    FeedbackAdaptiveRateLimiter limiter = factory.create();
126    limiter.set(10, TimeUnit.SECONDS);
127
128    testEdge.setValue(1000);
129    limiter.refill(10);
130
131    // Build up contention to increase backoff multiplier
132    for (int i = 0; i < 5; i++) {
133      limiter.consume(10);
134      limiter.getWaitInterval(10, 0, 1); // Create contention
135      testEdge.setValue(1000 + (i + 1) * 500);
136      limiter.refill(10);
137    }
138
139    // Measure wait interval with elevated backoff
140    limiter.consume(10);
141    long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1);
142
143    // Run several intervals without contention to decrease backoff
144    for (int i = 0; i < 10; i++) {
145      testEdge.setValue(4000 + i * 500);
146      limiter.refill(10);
147      // Consume less than available - no contention
148      limiter.consume(3);
149    }
150
151    // Measure wait interval after backoff reduction
152    testEdge.setValue(9500);
153    limiter.refill(10);
154    limiter.consume(10);
155    long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1);
156
157    // After 10 intervals without contention (decrement=0.05 each),
158    // multiplier should decrease by ~0.5, making wait interval lower
159    assertTrue(reducedWaitInterval < elevatedWaitInterval * 0.9,
160      "Wait interval should decrease without contention. Elevated: " + elevatedWaitInterval
161        + ", Reduced: " + reducedWaitInterval);
162  }
163
164  @Test
165  public void testOversubscriptionIncreasesWithLowUtilization() {
166    FeedbackAdaptiveRateLimiter limiter = factory.create();
167    limiter.set(10, TimeUnit.SECONDS);
168
169    testEdge.setValue(1000);
170
171    // Initial refill to set up the limiter
172    long initialRefill = limiter.refill(10);
173    assertEquals(10, initialRefill, "Initial refill should match limit");
174
175    // Create low utilization scenario (consuming much less than available)
176    // With error budget of 0.1, min target utilization is 0.9
177    // We'll consume only ~40% to trigger oversubscription increase
178    // Refill interval adjusted limit is 5 (500ms / 1000ms * 10)
179    for (int i = 0; i < 30; i++) {
180      // Consume before advancing time so utilization is tracked
181      limiter.consume(2); // 2 out of 5 = 40% utilization
182      testEdge.setValue(1000 + (i + 1) * 500);
183      limiter.refill(10);
184    }
185
186    // After many intervals of low utilization, oversubscription should have increased
187    // Now test that the oversubscription proportion actually affects refill behavior
188    // Consume all available to start fresh
189    limiter.consume((int) limiter.getAvailable());
190
191    // Jump forward by 3 refill intervals (1500ms)
192    // This tests that refill can return more than the base limit due to oversubscription
193    testEdge.setValue(16000 + 1500);
194    long multiIntervalRefill = limiter.refill(10);
195
196    // With oversubscription at max (0.2), the oversubscribed limit is 10 * 1.2 = 12
197    // With 3 intervals: refillAmount = 3 * 5 = 15
198    // Result = min(12, 15) = 12, which exceeds the base limit of 10
199    // Without oversubscription, this would be capped at min(10, 15) = 10
200    assertTrue(multiIntervalRefill > 10,
201      "With oversubscription from low utilization, refill should exceed base limit. Got: "
202        + multiIntervalRefill);
203  }
204
205  @Test
206  public void testOversubscriptionDecreasesWithHighUtilization() {
207    FeedbackAdaptiveRateLimiter limiter = factory.create();
208    limiter.set(10, TimeUnit.SECONDS);
209
210    testEdge.setValue(1000);
211
212    // First, build up oversubscription with low utilization
213    limiter.refill(10);
214    for (int i = 0; i < 15; i++) {
215      testEdge.setValue(1000 + (i + 1) * 500);
216      limiter.refill(10);
217      limiter.consume(2); // Low utilization
218    }
219
220    // Now create high utilization scenario (consuming more than target)
221    // With error budget of 0.1, max target utilization is 1.1
222    // We'll consume close to the full interval-adjusted limit to trigger decrease
223    for (int i = 0; i < 10; i++) {
224      testEdge.setValue(8500 + (i + 1) * 500);
225      long refilled = limiter.refill(10);
226      // Consume full amount to show high utilization
227      limiter.consume((int) refilled);
228    }
229
230    // After intervals of high utilization, oversubscription should decrease
231    testEdge.setValue(14000);
232    long refillAfterHighUtil = limiter.refill(10);
233
234    // Oversubscription should have decreased, so refill should be closer to base limit
235    // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop by ~0.05
236    assertTrue(refillAfterHighUtil <= 6,
237      "Refill should be closer to base after high utilization. Got: " + refillAfterHighUtil);
238  }
239
240  @Test
241  public void testBackoffMultiplierCapsAtMaximum() {
242    FeedbackAdaptiveRateLimiter limiter = factory.create();
243    limiter.set(10, TimeUnit.SECONDS);
244
245    testEdge.setValue(1000);
246    limiter.refill(10);
247
248    // Record base wait interval
249    limiter.consume(10);
250    long baseWaitInterval = limiter.getWaitInterval(10, 0, 1);
251
252    // Create extreme sustained contention to push backoff to max
253    // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals
254    for (int i = 0; i < 25; i++) {
255      testEdge.setValue(1000 + (i + 1) * 500);
256      limiter.refill(10);
257      limiter.consume(10);
258      limiter.getWaitInterval(10, 0, 1); // Create contention
259    }
260
261    // Measure wait at maximum backoff
262    testEdge.setValue(14000);
263    limiter.refill(10);
264    limiter.consume(10);
265    long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1);
266
267    // Wait interval should be approximately 3x base (max multiplier)
268    assertTrue(
269      maxBackoffWaitInterval >= baseWaitInterval * 2.5
270        && maxBackoffWaitInterval <= baseWaitInterval * 3.5,
271      "Wait interval should cap at max multiplier. Base: " + baseWaitInterval + ", Max backoff: "
272        + maxBackoffWaitInterval);
273
274    // Additional contention should not increase wait further
275    testEdge.setValue(14500);
276    limiter.refill(10);
277    limiter.consume(10);
278    limiter.getWaitInterval(10, 0, 1);
279
280    testEdge.setValue(15000);
281    limiter.refill(10);
282    limiter.consume(10);
283    long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1);
284
285    // Should still be at max, not increasing further
286    assertTrue(Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < baseWaitInterval * 0.2,
287      "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", Current: "
288        + stillMaxWaitInterval);
289  }
290
291  @Test
292  public void testOversubscriptionCapsAtMaximum() {
293    FeedbackAdaptiveRateLimiter limiter = factory.create();
294    limiter.set(10, TimeUnit.SECONDS);
295
296    testEdge.setValue(1000);
297    limiter.refill(10);
298
299    // Create extreme low utilization to push oversubscription to max
300    // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals
301    for (int i = 0; i < 25; i++) {
302      testEdge.setValue(1000 + (i + 1) * 500);
303      limiter.refill(10);
304      // Very low consumption to maximize oversubscription increase
305      limiter.consume(1);
306    }
307
308    // Check that refill is capped at max oversubscription
309    testEdge.setValue(14000);
310    long refillWithMaxOversubscription = limiter.refill(10);
311
312    // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6
313    // (5 is the interval-adjusted limit for 500ms refill interval)
314    assertTrue(refillWithMaxOversubscription <= 7,
315      "Refill should cap at max oversubscription. Got: " + refillWithMaxOversubscription);
316
317    // Further low utilization should not increase refill
318    testEdge.setValue(14500);
319    limiter.refill(10);
320    limiter.consume(1);
321
322    testEdge.setValue(15000);
323    long stillMaxRefill = limiter.refill(10);
324
325    // Should remain at cap
326    assertEquals(refillWithMaxOversubscription, stillMaxRefill,
327      "Refill should remain at max oversubscription");
328  }
329
330  @Test
331  public void testMultipleRefillIntervals() {
332    FeedbackAdaptiveRateLimiter limiter = factory.create();
333    limiter.set(10, TimeUnit.SECONDS);
334
335    testEdge.setValue(1000);
336    limiter.refill(10);
337    limiter.consume(10);
338
339    // Jump forward by multiple refill intervals (3 intervals = 1500ms)
340    testEdge.setValue(1000 + 1500);
341
342    // Should refill 3 intervals worth, but capped at oversubscribed limit
343    long multiIntervalRefill = limiter.refill(10);
344
345    // With 500ms refill interval, each interval gives 5 resources
346    // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10
347    assertTrue(multiIntervalRefill >= 10,
348      "Multiple interval refill should provide multiple refill amounts. Got: "
349        + multiIntervalRefill);
350  }
351
352  @Test
353  public void testRefillIntervalAdjustment() {
354    FeedbackAdaptiveRateLimiter limiter = factory.create();
355    limiter.set(10, TimeUnit.SECONDS);
356
357    testEdge.setValue(1000);
358
359    // First refill should give full limit
360    long firstRefill = limiter.refill(10);
361    assertEquals(10, firstRefill, "First refill should give full limit");
362
363    limiter.consume(10);
364
365    // After exactly one refill interval (500ms), should get interval-adjusted amount
366    testEdge.setValue(1000 + 500);
367    long adjustedRefill = limiter.refill(10);
368
369    // 500ms is half of 1000ms time unit, so should get half the limit = 5
370    assertEquals(5, adjustedRefill, "Refill after one interval should be interval-adjusted");
371  }
372
373  @Test
374  public void testBackoffMultiplierBottomsAtOne() {
375    FeedbackAdaptiveRateLimiter limiter = factory.create();
376    limiter.set(10, TimeUnit.SECONDS);
377
378    testEdge.setValue(1000);
379    limiter.refill(10);
380
381    // Record baseline wait with no backoff applied
382    limiter.consume(10);
383    long baselineWait = limiter.getWaitInterval(10, 0, 1);
384
385    // Run many intervals without contention to ensure multiplier stays at 1.0
386    for (int i = 0; i < 20; i++) {
387      testEdge.setValue(1000 + (i + 1) * 500);
388      limiter.refill(10);
389      limiter.consume(3); // No contention
390    }
391
392    // Wait interval should still be at baseline (multiplier = 1.0)
393    testEdge.setValue(11500);
394    limiter.refill(10);
395    limiter.consume(10);
396    long noContentionWait = limiter.getWaitInterval(10, 0, 1);
397
398    assertEquals(baselineWait, noContentionWait,
399      "Wait interval should not go below baseline (multiplier=1.0)");
400  }
401
402  @Test
403  public void testConcurrentAccess() throws InterruptedException {
404    FeedbackAdaptiveRateLimiter limiter = factory.create();
405    limiter.set(100, TimeUnit.SECONDS);
406
407    testEdge.setValue(1000);
408    limiter.refill(100);
409
410    // Simulate concurrent access
411    Thread[] threads = new Thread[10];
412    for (int i = 0; i < threads.length; i++) {
413      threads[i] = new Thread(() -> {
414        for (int j = 0; j < 10; j++) {
415          limiter.consume(1);
416          limiter.getWaitInterval(100, 50, 1);
417        }
418      });
419    }
420
421    for (Thread thread : threads) {
422      thread.start();
423    }
424
425    for (Thread thread : threads) {
426      thread.join();
427    }
428
429    // Should complete without exceptions - basic thread safety verification
430    assertTrue(true, "Concurrent access should complete successfully");
431  }
432
433  @Test
434  public void testOverconsumptionBehavior() {
435    FeedbackAdaptiveRateLimiter limiter = factory.create();
436    limiter.set(10, TimeUnit.SECONDS);
437
438    testEdge.setValue(1000);
439    limiter.refill(10);
440
441    // Over-consume significantly
442    limiter.consume(20);
443
444    // Should require waiting for multiple intervals (500ms refill interval)
445    long waitInterval = limiter.getWaitInterval(10, -10, 1);
446    assertTrue(waitInterval >= 500, "Should require substantial wait after over-consumption");
447  }
448
449  @Test
450  public void testOscillatingLoadPattern() {
451    FeedbackAdaptiveRateLimiter limiter = factory.create();
452    limiter.set(10, TimeUnit.SECONDS);
453
454    testEdge.setValue(1000);
455    limiter.refill(10);
456
457    // Oscillate between high contention and low contention
458    for (int cycle = 0; cycle < 3; cycle++) {
459      // High contention phase - increase backoff
460      for (int i = 0; i < 3; i++) {
461        testEdge.setValue(1000 + (cycle * 3000) + (i * 500));
462        limiter.refill(10);
463        limiter.consume(10);
464        limiter.getWaitInterval(10, 0, 1); // Create contention
465      }
466
467      long highContentionWait = limiter.getWaitInterval(10, 0, 1);
468
469      // Low contention phase - decrease backoff
470      for (int i = 0; i < 3; i++) {
471        testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500));
472        limiter.refill(10);
473        limiter.consume(3); // No contention
474      }
475
476      testEdge.setValue(1000 + (cycle * 3000) + 3000);
477      limiter.refill(10);
478      limiter.consume(10);
479      long lowContentionWait = limiter.getWaitInterval(10, 0, 1);
480
481      // After low contention phase, wait should be lower than after high contention
482      assertTrue(lowContentionWait < highContentionWait,
483        "Wait should decrease after low contention phase in cycle " + cycle + ". High: "
484          + highContentionWait + ", Low: " + lowContentionWait);
485    }
486  }
487
488  @Test
489  public void testUtilizationEmaConvergence() {
490    FeedbackAdaptiveRateLimiter limiter = factory.create();
491    limiter.set(10, TimeUnit.SECONDS);
492
493    testEdge.setValue(1000);
494    limiter.refill(10);
495
496    // Consistently consume at 80% utilization
497    for (int i = 0; i < 30; i++) {
498      testEdge.setValue(1000 + (i + 1) * 500);
499      limiter.refill(10);
500      limiter.consume(4); // 4 out of 5 interval-adjusted = 80%
501    }
502
503    // After many intervals, oversubscription should stabilize
504    // At 80% utilization (below 90% target), oversubscription should increase
505    testEdge.setValue(16500);
506    limiter.refill(10);
507
508    // Now switch to 100% utilization
509    for (int i = 0; i < 30; i++) {
510      testEdge.setValue(16500 + (i + 1) * 500);
511      long refilled = limiter.refill(10);
512      limiter.consume((int) refilled); // Consume everything
513    }
514
515    // At 100% utilization (within target range), oversubscription should stabilize
516    testEdge.setValue(32000);
517    limiter.refill(10);
518
519    // The EMA should have adjusted, and refills should be different
520    // (though exact values depend on EMA convergence rate)
521    assertTrue(true, "Refill behavior should adapt to utilization patterns");
522  }
523
524  private static final class ManualEnvironmentEdge implements EnvironmentEdge {
525    private long currentTime = 1000;
526
527    public void setValue(long time) {
528      this.currentTime = time;
529    }
530
531    @Override
532    public long currentTime() {
533      return currentTime;
534    }
535  }
536}