001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.util.concurrent.TimeUnit;
021import org.apache.yetus.audience.InterfaceAudience;
022import org.apache.yetus.audience.InterfaceStability;
023
024/**
025 * Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter
026 * RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter();
027 * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0)
028 * { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the
029 * work and consume the resource... limiter.consume();
030 */
031@InterfaceAudience.Private
032@InterfaceStability.Evolving
033@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
034    justification = "FindBugs seems confused; says limit and tlimit "
035      + "are mostly synchronized...but to me it looks like they are totally synchronized")
036public abstract class RateLimiter {
037  public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
038  public static final long DEFAULT_TIME_UNIT = 1000;
039
040  private long tunit = DEFAULT_TIME_UNIT; // Timeunit factor for translating to ms.
041  private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
042  private long avail = Long.MAX_VALUE; // Currently available resource units
043
044  /**
045   * Refill the available units w.r.t the elapsed time.
046   * @param limit Maximum available resource units that can be refilled to.
047   * @return how many resource units may be refilled ?
048   */
049  abstract long refill(long limit);
050
051  /**
052   * Time in milliseconds to wait for before requesting to consume 'amount' resource.
053   * @param limit     Maximum available resource units that can be refilled to.
054   * @param available Currently available resource units
055   * @param amount    Resources for which time interval to calculate for
056   * @return estimate of the ms required to wait before being able to provide 'amount' resources.
057   */
058  abstract long getWaitInterval(long limit, long available, long amount);
059
060  /**
061   * Set the RateLimiter max available resources and refill period.
062   * @param limit    The max value available resource units can be refilled to.
063   * @param timeUnit Timeunit factor for translating to ms.
064   */
065  public synchronized void set(final long limit, final TimeUnit timeUnit) {
066    switch (timeUnit) {
067      case MILLISECONDS:
068        tunit = 1;
069        break;
070      case SECONDS:
071        tunit = 1000;
072        break;
073      case MINUTES:
074        tunit = 60 * 1000;
075        break;
076      case HOURS:
077        tunit = 60 * 60 * 1000;
078        break;
079      case DAYS:
080        tunit = 24 * 60 * 60 * 1000;
081        break;
082      default:
083        throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
084    }
085    this.limit = limit;
086    this.avail = limit;
087  }
088
089  @Override
090  public String toString() {
091    String rateLimiter = this.getClass().getSimpleName();
092    if (getLimit() == Long.MAX_VALUE) {
093      return rateLimiter + "(Bypass)";
094    }
095    return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() + " tunit="
096      + getTimeUnitInMillis() + ")";
097  }
098
099  /**
100   * Sets the current instance of RateLimiter to a new values. if current limit is smaller than the
101   * new limit, bump up the available resources. Otherwise allow clients to use up the previously
102   * available resources.
103   */
104  public synchronized void update(final RateLimiter other) {
105    this.tunit = other.tunit;
106    if (this.limit < other.limit) {
107      // If avail is capped to this.limit, it will never overflow,
108      // otherwise, avail may overflow, just be careful here.
109      long diff = other.limit - this.limit;
110      if (this.avail <= Long.MAX_VALUE - diff) {
111        this.avail += diff;
112        this.avail = Math.min(this.avail, other.limit);
113      } else {
114        this.avail = other.limit;
115      }
116    }
117    this.limit = other.limit;
118  }
119
120  public synchronized boolean isBypass() {
121    return getLimit() == Long.MAX_VALUE;
122  }
123
124  public synchronized long getLimit() {
125    return limit;
126  }
127
128  public synchronized long getAvailable() {
129    return avail;
130  }
131
132  protected synchronized long getTimeUnitInMillis() {
133    return tunit;
134  }
135
136  /**
137   * Is there at least one resource available to allow execution?
138   * @return the waitInterval to backoff, or 0 if execution is allowed
139   */
140  public long getWaitIntervalMs() {
141    return getWaitIntervalMs(1);
142  }
143
144  /**
145   * Are there enough available resources to allow execution?
146   * @param amount the number of required resources, a non-negative number
147   * @return the waitInterval to backoff, or 0 if execution is allowed
148   */
149  public synchronized long getWaitIntervalMs(final long amount) {
150    assert amount >= 0;
151    if (!isAvailable(amount)) {
152      return waitInterval(amount);
153    }
154    return 0;
155  }
156
157  /**
158   * Are there enough available resources to allow execution?
159   * @param amount the number of required resources, a non-negative number
160   * @return true if there are enough available resources, otherwise false
161   */
162  protected boolean isAvailable(final long amount) {
163    if (isBypass()) {
164      return true;
165    }
166
167    long refillAmount = refill(limit);
168    if (refillAmount == 0 && avail < amount) {
169      return false;
170    }
171    // check for positive overflow
172    if (avail <= Long.MAX_VALUE - refillAmount) {
173      avail = Math.min(avail + refillAmount, limit);
174    } else {
175      avail = limit;
176    }
177    if (avail >= amount) {
178      return true;
179    }
180    return false;
181  }
182
183  /**
184   * consume one available unit.
185   */
186  public void consume() {
187    consume(1);
188  }
189
190  /**
191   * consume amount available units, amount could be a negative number
192   * @param amount the number of units to consume
193   */
194  public synchronized void consume(final long amount) {
195
196    if (isBypass()) {
197      return;
198    }
199
200    if (amount >= 0) {
201      this.avail -= amount;
202    } else {
203      if (this.avail <= Long.MAX_VALUE + amount) {
204        this.avail -= amount;
205        this.avail = Math.min(this.avail, this.limit);
206      } else {
207        this.avail = this.limit;
208      }
209    }
210  }
211
212  /** Returns estimate of the ms required to wait before being able to provide 1 resource. */
213  public long waitInterval() {
214    return waitInterval(1);
215  }
216
217  /**
218   * Returns estimate of the ms required to wait before being able to provide "amount" resources.
219   */
220  public synchronized long waitInterval(final long amount) {
221    // TODO Handle over quota?
222    return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
223  }
224
225  // These two method are for strictly testing purpose only
226
227  public abstract void setNextRefillTime(long nextRefillTime);
228
229  public abstract long getNextRefillTime();
230}