001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.concurrent.atomic.AtomicLong; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.yetus.audience.InterfaceStability; 025 026import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 027import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicDouble; 028 029/** 030 * An adaptive rate limiter that dynamically adjusts its behavior based on observed usage patterns 031 * to achieve stable, full utilization of configured quota allowances while managing client 032 * contention. 033 * <p> 034 * <b>Core Algorithm:</b> This rate limiter divides time into fixed refill intervals (configurable 035 * via {@code hbase.quota.rate.limiter.refill.interval.ms}, default is 1 refill per TimeUnit of the 036 * RateLimiter). At the beginning of each interval, a fresh allocation of resources becomes 037 * available based on the configured limit. Clients consume resources as they make requests. When 038 * resources are exhausted, clients must wait until the next refill, or until enough resources 039 * become available. 040 * <p> 041 * <b>Adaptive Backpressure:</b> When multiple threads compete for limited resources (contention), 042 * this limiter detects the contention and applies increasing backpressure by extending wait 043 * intervals. This prevents thundering herd behavior where many threads wake simultaneously and 044 * compete for the same resources. The backoff multiplier increases by a small increment (see 045 * {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}) per interval when contention occurs, and 046 * decreases (see {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}) when no contention is 047 * detected, converging toward optimal throughput. The multiplier is capped at a maximum value (see 048 * {@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}) to prevent unbounded waits. 049 * <p> 050 * Contention is detected when {@link #getWaitInterval} is called with insufficient available 051 * resources (i.e., {@code amount > available}), indicating a thread needs to wait for resources. If 052 * this occurs more than once in a refill interval, the limiter identifies it as contention 053 * requiring increased backpressure. 054 * <p> 055 * <b>Oversubscription for Full Utilization:</b> In practice, synchronization overhead and timing 056 * variations often prevent clients from consuming exactly their full allowance, resulting in 057 * consistent under-utilization. This limiter addresses this by tracking utilization via an 058 * exponentially weighted moving average (EWMA). When average utilization falls below the target 059 * range (determined by {@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}), the limiter gradually 060 * increases the oversubscription proportion (see 061 * {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}), allowing more resources per interval than 062 * the base limit. Conversely, when utilization exceeds the target range, oversubscription is 063 * decreased (see {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}). Oversubscription is capped 064 * (see {@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}) to prevent excessive bursts while still 065 * enabling consistent full utilization. 066 * <p> 067 * <b>Example Scenario:</b> Consider a quota of 1000 requests per second with a 1-second refill 068 * interval. Without oversubscription, clients might typically achieve only 950 req/s due to 069 * coordination delays. This limiter would detect the under-utilization, gradually increase 070 * oversubscription, allowing slightly more resources per interval, which compensates for 071 * inefficiencies and achieves stable throughput closer to the configured quota. If multiple threads 072 * simultaneously try to consume resources and repeatedly wait, the backoff multiplier increases 073 * their wait times, spreading out their retry attempts and reducing wasted CPU cycles. 074 * <p> 075 * <b>Configuration Parameters:</b> 076 * <ul> 077 * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}: Controls rate of backpressure 078 * increase</li> 079 * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}: Controls rate of backpressure 080 * decrease</li> 081 * <li>{@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}: Caps the maximum wait time extension</li> 082 * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}: Controls rate of oversubscription 083 * increase</li> 084 * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}: Controls rate of oversubscription 085 * decrease</li> 086 * <li>{@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}: Caps the maximum burst capacity</li> 087 * <li>{@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}: Defines the acceptable range around full 088 * utilization</li> 089 * </ul> 090 * <p> 091 * This algorithm converges toward stable operation where: (1) wait intervals are just long enough 092 * to prevent excessive contention, and (2) oversubscription is just high enough to achieve 093 * consistent full utilization of the configured allowance. 094 */ 095@InterfaceAudience.Private 096@InterfaceStability.Evolving 097public class FeedbackAdaptiveRateLimiter extends RateLimiter { 098 099 /** 100 * Amount to increase the backoff multiplier when contention is detected per refill interval. In 101 * other words, if we are throttling more than once per refill interval, then we will increase our 102 * wait intervals (increase backpressure, decrease throughput). 103 */ 104 public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT = 105 "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.increment"; 106 public static final double DEFAULT_BACKOFF_MULTIPLIER_INCREMENT = 0.0005; 107 108 /** 109 * Amount to decrease the backoff multiplier when no contention is detected per refill interval. 110 * In other words, if we are only throttling once per refill interval, then we will decrease our 111 * wait interval (decrease backpressure, increase throughput). 112 */ 113 public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT = 114 "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.decrement"; 115 public static final double DEFAULT_BACKOFF_MULTIPLIER_DECREMENT = 0.0001; 116 117 /** 118 * Maximum ceiling for the backoff multiplier to avoid unbounded waits. 119 */ 120 public static final String FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER = 121 "hbase.quota.rate.limiter.feedback.adaptive.max.backoff.multiplier"; 122 public static final double DEFAULT_MAX_BACKOFF_MULTIPLIER = 10.0; 123 124 /** 125 * Amount to increase the oversubscription proportion when utilization is below (1.0-errorBudget). 126 */ 127 public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT = 128 "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.increment"; 129 public static final double DEFAULT_OVERSUBSCRIPTION_INCREMENT = 0.001; 130 131 /** 132 * Amount to decrease the oversubscription proportion when utilization exceeds (1.0+errorBudget). 133 */ 134 public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT = 135 "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.decrement"; 136 public static final double DEFAULT_OVERSUBSCRIPTION_DECREMENT = 0.00005; 137 138 /** 139 * Maximum ceiling for oversubscription to prevent unbounded bursts. Some oversubscription can be 140 * nice, because it allows you to balance the inefficiency and latency of retries, landing on 141 * stable usage at approximately your configured allowance. Without adequate oversubscription, 142 * your steady state may often seem significantly, and suspiciously, lower than your configured 143 * allowance. 144 */ 145 public static final String FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION = 146 "hbase.quota.rate.limiter.feedback.adaptive.max.oversubscription"; 147 public static final double DEFAULT_MAX_OVERSUBSCRIPTION = 0.25; 148 149 /** 150 * Acceptable deviation around full utilization (1.0) for adjusting oversubscription. If stable 151 * throttle usage is typically under (1.0-errorBudget), then we will allow more oversubscription. 152 * If stable throttle usage is typically over (1.0+errorBudget), then we will pull back 153 * oversubscription. 154 */ 155 public static final String FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET = 156 "hbase.quota.rate.limiter.feedback.adaptive.utilization.error.budget"; 157 public static final double DEFAULT_UTILIZATION_ERROR_BUDGET = 0.025; 158 159 private static final int WINDOW_TIME_MS = 60_000; 160 161 public static class FeedbackAdaptiveRateLimiterFactory { 162 163 private final long refillInterval; 164 private final double backoffMultiplierIncrement; 165 private final double backoffMultiplierDecrement; 166 private final double maxBackoffMultiplier; 167 private final double oversubscriptionIncrement; 168 private final double oversubscriptionDecrement; 169 private final double maxOversubscription; 170 private final double utilizationErrorBudget; 171 172 public FeedbackAdaptiveRateLimiterFactory(Configuration conf) { 173 refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 174 RateLimiter.DEFAULT_TIME_UNIT); 175 176 maxBackoffMultiplier = 177 conf.getDouble(FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, DEFAULT_MAX_BACKOFF_MULTIPLIER); 178 179 backoffMultiplierIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 180 DEFAULT_BACKOFF_MULTIPLIER_INCREMENT); 181 backoffMultiplierDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT, 182 DEFAULT_BACKOFF_MULTIPLIER_DECREMENT); 183 184 oversubscriptionIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 185 DEFAULT_OVERSUBSCRIPTION_INCREMENT); 186 oversubscriptionDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 187 DEFAULT_OVERSUBSCRIPTION_DECREMENT); 188 189 maxOversubscription = 190 conf.getDouble(FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, DEFAULT_MAX_OVERSUBSCRIPTION); 191 utilizationErrorBudget = conf.getDouble(FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 192 DEFAULT_UTILIZATION_ERROR_BUDGET); 193 } 194 195 public FeedbackAdaptiveRateLimiter create() { 196 return new FeedbackAdaptiveRateLimiter(refillInterval, backoffMultiplierIncrement, 197 backoffMultiplierDecrement, maxBackoffMultiplier, oversubscriptionIncrement, 198 oversubscriptionDecrement, maxOversubscription, utilizationErrorBudget); 199 } 200 } 201 202 private volatile long nextRefillTime = -1L; 203 private final long refillInterval; 204 private final double backoffMultiplierIncrement; 205 private final double backoffMultiplierDecrement; 206 private final double maxBackoffMultiplier; 207 private final double oversubscriptionIncrement; 208 private final double oversubscriptionDecrement; 209 private final double maxOversubscription; 210 private final double minTargetUtilization; 211 private final double maxTargetUtilization; 212 213 // Adaptive backoff state 214 private final AtomicDouble currentBackoffMultiplier = new AtomicDouble(1.0); 215 private volatile boolean hadContentionThisInterval = false; 216 217 // Over-subscription proportion state 218 private final AtomicDouble oversubscriptionProportion = new AtomicDouble(0.0); 219 220 // EWMA tracking 221 private final double emaAlpha; 222 private volatile double utilizationEma = 0.0; 223 private final AtomicLong lastIntervalConsumed; 224 225 FeedbackAdaptiveRateLimiter(long refillInterval, double backoffMultiplierIncrement, 226 double backoffMultiplierDecrement, double maxBackoffMultiplier, 227 double oversubscriptionIncrement, double oversubscriptionDecrement, double maxOversubscription, 228 double utilizationErrorBudget) { 229 super(); 230 Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval, String.format( 231 "Refill interval %s must be ≤ TimeUnit millis %s", refillInterval, getTimeUnitInMillis())); 232 233 Preconditions.checkArgument(backoffMultiplierIncrement > 0.0, 234 String.format("Backoff multiplier increment %s must be > 0.0", backoffMultiplierIncrement)); 235 Preconditions.checkArgument(backoffMultiplierDecrement > 0.0, 236 String.format("Backoff multiplier decrement %s must be > 0.0", backoffMultiplierDecrement)); 237 Preconditions.checkArgument(maxBackoffMultiplier > 1.0, 238 String.format("Max backoff multiplier %s must be > 1.0", maxBackoffMultiplier)); 239 Preconditions.checkArgument(utilizationErrorBudget > 0.0 && utilizationErrorBudget <= 1.0, 240 String.format("Utilization error budget %s must be between 0.0 and 1.0", 241 utilizationErrorBudget)); 242 243 this.refillInterval = refillInterval; 244 this.backoffMultiplierIncrement = backoffMultiplierIncrement; 245 this.backoffMultiplierDecrement = backoffMultiplierDecrement; 246 this.maxBackoffMultiplier = maxBackoffMultiplier; 247 this.oversubscriptionIncrement = oversubscriptionIncrement; 248 this.oversubscriptionDecrement = oversubscriptionDecrement; 249 this.maxOversubscription = maxOversubscription; 250 this.minTargetUtilization = 1.0 - utilizationErrorBudget; 251 this.maxTargetUtilization = 1.0 + utilizationErrorBudget; 252 253 this.emaAlpha = refillInterval / (double) (WINDOW_TIME_MS + refillInterval); 254 this.lastIntervalConsumed = new AtomicLong(0); 255 } 256 257 @Override 258 public long refill(long limit) { 259 final long now = EnvironmentEdgeManager.currentTime(); 260 if (nextRefillTime == -1) { 261 nextRefillTime = now + refillInterval; 262 hadContentionThisInterval = false; 263 return getOversubscribedLimit(limit); 264 } 265 if (now < nextRefillTime) { 266 return 0; 267 } 268 long diff = refillInterval + now - nextRefillTime; 269 long refills = diff / refillInterval; 270 nextRefillTime = now + refillInterval; 271 272 long intendedUsage = getRefillIntervalAdjustedLimit(limit); 273 if (intendedUsage > 0) { 274 long consumed = lastIntervalConsumed.get(); 275 if (consumed > 0) { 276 double util = (double) consumed / intendedUsage; 277 utilizationEma = emaAlpha * util + (1.0 - emaAlpha) * utilizationEma; 278 } 279 } 280 281 if (hadContentionThisInterval) { 282 currentBackoffMultiplier.set(Math 283 .min(currentBackoffMultiplier.get() + backoffMultiplierIncrement, maxBackoffMultiplier)); 284 } else { 285 currentBackoffMultiplier 286 .set(Math.max(currentBackoffMultiplier.get() - backoffMultiplierDecrement, 1.0)); 287 } 288 289 double avgUtil = utilizationEma; 290 if (avgUtil < minTargetUtilization) { 291 oversubscriptionProportion.set(Math 292 .min(oversubscriptionProportion.get() + oversubscriptionIncrement, maxOversubscription)); 293 } else if (avgUtil >= maxTargetUtilization) { 294 oversubscriptionProportion 295 .set(Math.max(oversubscriptionProportion.get() - oversubscriptionDecrement, 0.0)); 296 } 297 298 hadContentionThisInterval = false; 299 lastIntervalConsumed.set(0); 300 301 long refillAmount = refills * getRefillIntervalAdjustedLimit(limit); 302 long maxRefill = getOversubscribedLimit(limit); 303 return Math.min(maxRefill, refillAmount); 304 } 305 306 private long getOversubscribedLimit(long limit) { 307 return limit + (long) (limit * oversubscriptionProportion.get()); 308 } 309 310 @Override 311 public void consume(long amount) { 312 super.consume(amount); 313 lastIntervalConsumed.addAndGet(amount); 314 } 315 316 @Override 317 public long getWaitInterval(long limit, long available, long amount) { 318 limit = getRefillIntervalAdjustedLimit(limit); 319 if (nextRefillTime == -1) { 320 return 0; 321 } 322 323 final long now = EnvironmentEdgeManager.currentTime(); 324 final long refillTime = nextRefillTime; 325 long diff = amount - available; 326 if (diff > 0) { 327 hadContentionThisInterval = true; 328 } 329 330 long nextInterval = refillTime - now; 331 if (diff <= limit) { 332 return applyBackoffMultiplier(nextInterval); 333 } 334 335 long extra = diff / limit; 336 if (diff % limit == 0) { 337 extra--; 338 } 339 long baseWait = nextInterval + (extra * refillInterval); 340 return applyBackoffMultiplier(baseWait); 341 } 342 343 private long getRefillIntervalAdjustedLimit(long limit) { 344 return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * limit); 345 } 346 347 private long applyBackoffMultiplier(long baseWaitInterval) { 348 return (long) (baseWaitInterval * currentBackoffMultiplier.get()); 349 } 350 351 // strictly for testing 352 @Override 353 public void setNextRefillTime(long nextRefillTime) { 354 this.nextRefillTime = nextRefillTime; 355 } 356 357 @Override 358 public long getNextRefillTime() { 359 return this.nextRefillTime; 360 } 361}