001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.util.concurrent.atomic.AtomicLong;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.yetus.audience.InterfaceStability;
025
026import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
027import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicDouble;
028
029/**
030 * An adaptive rate limiter that dynamically adjusts its behavior based on observed usage patterns
031 * to achieve stable, full utilization of configured quota allowances while managing client
032 * contention.
033 * <p>
034 * <b>Core Algorithm:</b> This rate limiter divides time into fixed refill intervals (configurable
035 * via {@code hbase.quota.rate.limiter.refill.interval.ms}, default is 1 refill per TimeUnit of the
036 * RateLimiter). At the beginning of each interval, a fresh allocation of resources becomes
037 * available based on the configured limit. Clients consume resources as they make requests. When
038 * resources are exhausted, clients must wait until the next refill, or until enough resources
039 * become available.
040 * <p>
041 * <b>Adaptive Backpressure:</b> When multiple threads compete for limited resources (contention),
042 * this limiter detects the contention and applies increasing backpressure by extending wait
043 * intervals. This prevents thundering herd behavior where many threads wake simultaneously and
044 * compete for the same resources. The backoff multiplier increases by a small increment (see
045 * {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}) per interval when contention occurs, and
046 * decreases (see {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}) when no contention is
047 * detected, converging toward optimal throughput. The multiplier is capped at a maximum value (see
048 * {@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}) to prevent unbounded waits.
049 * <p>
050 * Contention is detected when {@link #getWaitInterval} is called with insufficient available
051 * resources (i.e., {@code amount > available}), indicating a thread needs to wait for resources. If
052 * this occurs more than once in a refill interval, the limiter identifies it as contention
053 * requiring increased backpressure.
054 * <p>
055 * <b>Oversubscription for Full Utilization:</b> In practice, synchronization overhead and timing
056 * variations often prevent clients from consuming exactly their full allowance, resulting in
057 * consistent under-utilization. This limiter addresses this by tracking utilization via an
058 * exponentially weighted moving average (EWMA). When average utilization falls below the target
059 * range (determined by {@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}), the limiter gradually
060 * increases the oversubscription proportion (see
061 * {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}), allowing more resources per interval than
062 * the base limit. Conversely, when utilization exceeds the target range, oversubscription is
063 * decreased (see {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}). Oversubscription is capped
064 * (see {@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}) to prevent excessive bursts while still
065 * enabling consistent full utilization.
066 * <p>
067 * <b>Example Scenario:</b> Consider a quota of 1000 requests per second with a 1-second refill
068 * interval. Without oversubscription, clients might typically achieve only 950 req/s due to
069 * coordination delays. This limiter would detect the under-utilization, gradually increase
070 * oversubscription, allowing slightly more resources per interval, which compensates for
071 * inefficiencies and achieves stable throughput closer to the configured quota. If multiple threads
072 * simultaneously try to consume resources and repeatedly wait, the backoff multiplier increases
073 * their wait times, spreading out their retry attempts and reducing wasted CPU cycles.
074 * <p>
075 * <b>Configuration Parameters:</b>
076 * <ul>
077 * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}: Controls rate of backpressure
078 * increase</li>
079 * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}: Controls rate of backpressure
080 * decrease</li>
081 * <li>{@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}: Caps the maximum wait time extension</li>
082 * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}: Controls rate of oversubscription
083 * increase</li>
084 * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}: Controls rate of oversubscription
085 * decrease</li>
086 * <li>{@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}: Caps the maximum burst capacity</li>
087 * <li>{@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}: Defines the acceptable range around full
088 * utilization</li>
089 * </ul>
090 * <p>
091 * This algorithm converges toward stable operation where: (1) wait intervals are just long enough
092 * to prevent excessive contention, and (2) oversubscription is just high enough to achieve
093 * consistent full utilization of the configured allowance.
094 */
095@InterfaceAudience.Private
096@InterfaceStability.Evolving
097public class FeedbackAdaptiveRateLimiter extends RateLimiter {
098
099  /**
100   * Amount to increase the backoff multiplier when contention is detected per refill interval. In
101   * other words, if we are throttling more than once per refill interval, then we will increase our
102   * wait intervals (increase backpressure, decrease throughput).
103   */
104  public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT =
105    "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.increment";
106  public static final double DEFAULT_BACKOFF_MULTIPLIER_INCREMENT = 0.0005;
107
108  /**
109   * Amount to decrease the backoff multiplier when no contention is detected per refill interval.
110   * In other words, if we are only throttling once per refill interval, then we will decrease our
111   * wait interval (decrease backpressure, increase throughput).
112   */
113  public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT =
114    "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.decrement";
115  public static final double DEFAULT_BACKOFF_MULTIPLIER_DECREMENT = 0.0001;
116
117  /**
118   * Maximum ceiling for the backoff multiplier to avoid unbounded waits.
119   */
120  public static final String FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER =
121    "hbase.quota.rate.limiter.feedback.adaptive.max.backoff.multiplier";
122  public static final double DEFAULT_MAX_BACKOFF_MULTIPLIER = 10.0;
123
124  /**
125   * Amount to increase the oversubscription proportion when utilization is below (1.0-errorBudget).
126   */
127  public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT =
128    "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.increment";
129  public static final double DEFAULT_OVERSUBSCRIPTION_INCREMENT = 0.001;
130
131  /**
132   * Amount to decrease the oversubscription proportion when utilization exceeds (1.0+errorBudget).
133   */
134  public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT =
135    "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.decrement";
136  public static final double DEFAULT_OVERSUBSCRIPTION_DECREMENT = 0.00005;
137
138  /**
139   * Maximum ceiling for oversubscription to prevent unbounded bursts. Some oversubscription can be
140   * nice, because it allows you to balance the inefficiency and latency of retries, landing on
141   * stable usage at approximately your configured allowance. Without adequate oversubscription,
142   * your steady state may often seem significantly, and suspiciously, lower than your configured
143   * allowance.
144   */
145  public static final String FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION =
146    "hbase.quota.rate.limiter.feedback.adaptive.max.oversubscription";
147  public static final double DEFAULT_MAX_OVERSUBSCRIPTION = 0.25;
148
149  /**
150   * Acceptable deviation around full utilization (1.0) for adjusting oversubscription. If stable
151   * throttle usage is typically under (1.0-errorBudget), then we will allow more oversubscription.
152   * If stable throttle usage is typically over (1.0+errorBudget), then we will pull back
153   * oversubscription.
154   */
155  public static final String FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET =
156    "hbase.quota.rate.limiter.feedback.adaptive.utilization.error.budget";
157  public static final double DEFAULT_UTILIZATION_ERROR_BUDGET = 0.025;
158
159  private static final int WINDOW_TIME_MS = 60_000;
160
161  public static class FeedbackAdaptiveRateLimiterFactory {
162
163    private final long refillInterval;
164    private final double backoffMultiplierIncrement;
165    private final double backoffMultiplierDecrement;
166    private final double maxBackoffMultiplier;
167    private final double oversubscriptionIncrement;
168    private final double oversubscriptionDecrement;
169    private final double maxOversubscription;
170    private final double utilizationErrorBudget;
171
172    public FeedbackAdaptiveRateLimiterFactory(Configuration conf) {
173      refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
174        RateLimiter.DEFAULT_TIME_UNIT);
175
176      maxBackoffMultiplier =
177        conf.getDouble(FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, DEFAULT_MAX_BACKOFF_MULTIPLIER);
178
179      backoffMultiplierIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT,
180        DEFAULT_BACKOFF_MULTIPLIER_INCREMENT);
181      backoffMultiplierDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
182        DEFAULT_BACKOFF_MULTIPLIER_DECREMENT);
183
184      oversubscriptionIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT,
185        DEFAULT_OVERSUBSCRIPTION_INCREMENT);
186      oversubscriptionDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT,
187        DEFAULT_OVERSUBSCRIPTION_DECREMENT);
188
189      maxOversubscription =
190        conf.getDouble(FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, DEFAULT_MAX_OVERSUBSCRIPTION);
191      utilizationErrorBudget = conf.getDouble(FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET,
192        DEFAULT_UTILIZATION_ERROR_BUDGET);
193    }
194
195    public FeedbackAdaptiveRateLimiter create() {
196      return new FeedbackAdaptiveRateLimiter(refillInterval, backoffMultiplierIncrement,
197        backoffMultiplierDecrement, maxBackoffMultiplier, oversubscriptionIncrement,
198        oversubscriptionDecrement, maxOversubscription, utilizationErrorBudget);
199    }
200  }
201
202  private volatile long nextRefillTime = -1L;
203  private final long refillInterval;
204  private final double backoffMultiplierIncrement;
205  private final double backoffMultiplierDecrement;
206  private final double maxBackoffMultiplier;
207  private final double oversubscriptionIncrement;
208  private final double oversubscriptionDecrement;
209  private final double maxOversubscription;
210  private final double minTargetUtilization;
211  private final double maxTargetUtilization;
212
213  // Adaptive backoff state
214  private final AtomicDouble currentBackoffMultiplier = new AtomicDouble(1.0);
215  private volatile boolean hadContentionThisInterval = false;
216
217  // Over-subscription proportion state
218  private final AtomicDouble oversubscriptionProportion = new AtomicDouble(0.0);
219
220  // EWMA tracking
221  private final double emaAlpha;
222  private volatile double utilizationEma = 0.0;
223  private final AtomicLong lastIntervalConsumed;
224
225  FeedbackAdaptiveRateLimiter(long refillInterval, double backoffMultiplierIncrement,
226    double backoffMultiplierDecrement, double maxBackoffMultiplier,
227    double oversubscriptionIncrement, double oversubscriptionDecrement, double maxOversubscription,
228    double utilizationErrorBudget) {
229    super();
230    Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval, String.format(
231      "Refill interval %s must be ≤ TimeUnit millis %s", refillInterval, getTimeUnitInMillis()));
232
233    Preconditions.checkArgument(backoffMultiplierIncrement > 0.0,
234      String.format("Backoff multiplier increment %s must be > 0.0", backoffMultiplierIncrement));
235    Preconditions.checkArgument(backoffMultiplierDecrement > 0.0,
236      String.format("Backoff multiplier decrement %s must be > 0.0", backoffMultiplierDecrement));
237    Preconditions.checkArgument(maxBackoffMultiplier > 1.0,
238      String.format("Max backoff multiplier %s must be > 1.0", maxBackoffMultiplier));
239    Preconditions.checkArgument(utilizationErrorBudget > 0.0 && utilizationErrorBudget <= 1.0,
240      String.format("Utilization error budget %s must be between 0.0 and 1.0",
241        utilizationErrorBudget));
242
243    this.refillInterval = refillInterval;
244    this.backoffMultiplierIncrement = backoffMultiplierIncrement;
245    this.backoffMultiplierDecrement = backoffMultiplierDecrement;
246    this.maxBackoffMultiplier = maxBackoffMultiplier;
247    this.oversubscriptionIncrement = oversubscriptionIncrement;
248    this.oversubscriptionDecrement = oversubscriptionDecrement;
249    this.maxOversubscription = maxOversubscription;
250    this.minTargetUtilization = 1.0 - utilizationErrorBudget;
251    this.maxTargetUtilization = 1.0 + utilizationErrorBudget;
252
253    this.emaAlpha = refillInterval / (double) (WINDOW_TIME_MS + refillInterval);
254    this.lastIntervalConsumed = new AtomicLong(0);
255  }
256
257  @Override
258  public long refill(long limit) {
259    final long now = EnvironmentEdgeManager.currentTime();
260    if (nextRefillTime == -1) {
261      nextRefillTime = now + refillInterval;
262      hadContentionThisInterval = false;
263      return getOversubscribedLimit(limit);
264    }
265    if (now < nextRefillTime) {
266      return 0;
267    }
268    long diff = refillInterval + now - nextRefillTime;
269    long refills = diff / refillInterval;
270    nextRefillTime = now + refillInterval;
271
272    long intendedUsage = getRefillIntervalAdjustedLimit(limit);
273    if (intendedUsage > 0) {
274      long consumed = lastIntervalConsumed.get();
275      if (consumed > 0) {
276        double util = (double) consumed / intendedUsage;
277        utilizationEma = emaAlpha * util + (1.0 - emaAlpha) * utilizationEma;
278      }
279    }
280
281    if (hadContentionThisInterval) {
282      currentBackoffMultiplier.set(Math
283        .min(currentBackoffMultiplier.get() + backoffMultiplierIncrement, maxBackoffMultiplier));
284    } else {
285      currentBackoffMultiplier
286        .set(Math.max(currentBackoffMultiplier.get() - backoffMultiplierDecrement, 1.0));
287    }
288
289    double avgUtil = utilizationEma;
290    if (avgUtil < minTargetUtilization) {
291      oversubscriptionProportion.set(Math
292        .min(oversubscriptionProportion.get() + oversubscriptionIncrement, maxOversubscription));
293    } else if (avgUtil >= maxTargetUtilization) {
294      oversubscriptionProportion
295        .set(Math.max(oversubscriptionProportion.get() - oversubscriptionDecrement, 0.0));
296    }
297
298    hadContentionThisInterval = false;
299    lastIntervalConsumed.set(0);
300
301    long refillAmount = refills * getRefillIntervalAdjustedLimit(limit);
302    long maxRefill = getOversubscribedLimit(limit);
303    return Math.min(maxRefill, refillAmount);
304  }
305
306  private long getOversubscribedLimit(long limit) {
307    return limit + (long) (limit * oversubscriptionProportion.get());
308  }
309
310  @Override
311  public void consume(long amount) {
312    super.consume(amount);
313    lastIntervalConsumed.addAndGet(amount);
314  }
315
316  @Override
317  public long getWaitInterval(long limit, long available, long amount) {
318    limit = getRefillIntervalAdjustedLimit(limit);
319    if (nextRefillTime == -1) {
320      return 0;
321    }
322
323    final long now = EnvironmentEdgeManager.currentTime();
324    final long refillTime = nextRefillTime;
325    long diff = amount - available;
326    if (diff > 0) {
327      hadContentionThisInterval = true;
328    }
329
330    long nextInterval = refillTime - now;
331    if (diff <= limit) {
332      return applyBackoffMultiplier(nextInterval);
333    }
334
335    long extra = diff / limit;
336    if (diff % limit == 0) {
337      extra--;
338    }
339    long baseWait = nextInterval + (extra * refillInterval);
340    return applyBackoffMultiplier(baseWait);
341  }
342
343  private long getRefillIntervalAdjustedLimit(long limit) {
344    return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * limit);
345  }
346
347  private long applyBackoffMultiplier(long baseWaitInterval) {
348    return (long) (baseWaitInterval * currentBackoffMultiplier.get());
349  }
350
351  // strictly for testing
352  @Override
353  public void setNextRefillTime(long nextRefillTime) {
354    this.nextRefillTime = nextRefillTime;
355  }
356
357  @Override
358  public long getNextRefillTime() {
359    return this.nextRefillTime;
360  }
361}