001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.util.concurrent.TimeUnit;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.yetus.audience.InterfaceStability;
025
026import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
027
028/**
029 * Simple rate limiter.
030 *
031 * Usage Example:
032 *    // At this point you have a unlimited resource limiter
033 *   RateLimiter limiter = new AverageIntervalRateLimiter();
034 *                         or new FixedIntervalRateLimiter();
035 *   limiter.set(10, TimeUnit.SECONDS);       // set 10 resources/sec
036 *
037 *   while (true) {
038 *     // call canExecute before performing resource consuming operation
039 *     bool canExecute = limiter.canExecute();
040 *     // If there are no available resources, wait until one is available
041 *     if (!canExecute) Thread.sleep(limiter.waitInterval());
042 *     // ...execute the work and consume the resource...
043 *     limiter.consume();
044 *   }
045 */
046@InterfaceAudience.Private
047@InterfaceStability.Evolving
048@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
049  justification="FindBugs seems confused; says limit and tlimit " +
050  "are mostly synchronized...but to me it looks like they are totally synchronized")
051public abstract class RateLimiter {
052  public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
053  private long tunit = 1000;           // Timeunit factor for translating to ms.
054  private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
055  private long avail = Long.MAX_VALUE; // Currently available resource units
056
057  /**
058   * Refill the available units w.r.t the elapsed time.
059   * @param limit Maximum available resource units that can be refilled to.
060   * @return how many resource units may be refilled ?
061   */
062  abstract long refill(long limit);
063
064  /**
065   * Time in milliseconds to wait for before requesting to consume 'amount' resource.
066   * @param limit Maximum available resource units that can be refilled to.
067   * @param available Currently available resource units
068   * @param amount Resources for which time interval to calculate for
069   * @return estimate of the ms required to wait before being able to provide 'amount' resources.
070   */
071  abstract long getWaitInterval(long limit, long available, long amount);
072
073
074  /**
075   * Set the RateLimiter max available resources and refill period.
076   * @param limit The max value available resource units can be refilled to.
077   * @param timeUnit Timeunit factor for translating to ms.
078   */
079  public synchronized void set(final long limit, final TimeUnit timeUnit) {
080    switch (timeUnit) {
081    case MILLISECONDS:
082      tunit = 1;
083      break;
084    case SECONDS:
085      tunit = 1000;
086      break;
087    case MINUTES:
088      tunit = 60 * 1000;
089      break;
090    case HOURS:
091      tunit = 60 * 60 * 1000;
092      break;
093    case DAYS:
094      tunit = 24 * 60 * 60 * 1000;
095      break;
096    default:
097      throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
098    }
099    this.limit = limit;
100    this.avail = limit;
101  }
102
103  @Override
104  public String toString() {
105    String rateLimiter = this.getClass().getSimpleName();
106    if (getLimit() == Long.MAX_VALUE) {
107      return rateLimiter + "(Bypass)";
108    }
109    return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() +
110        " tunit=" + getTimeUnitInMillis() + ")";
111  }
112
113  /**
114   * Sets the current instance of RateLimiter to a new values.
115   *
116   * if current limit is smaller than the new limit, bump up the available resources.
117   * Otherwise allow clients to use up the previously available resources.
118   */
119  public synchronized void update(final RateLimiter other) {
120    this.tunit = other.tunit;
121    if (this.limit < other.limit) {
122      // If avail is capped to this.limit, it will never overflow,
123      // otherwise, avail may overflow, just be careful here.
124      long diff = other.limit - this.limit;
125      if (this.avail <= Long.MAX_VALUE - diff) {
126        this.avail += diff;
127        this.avail = Math.min(this.avail, other.limit);
128      } else {
129        this.avail = other.limit;
130      }
131    }
132    this.limit = other.limit;
133  }
134
135  public synchronized boolean isBypass() {
136    return getLimit() == Long.MAX_VALUE;
137  }
138
139  public synchronized long getLimit() {
140    return limit;
141  }
142
143  public synchronized long getAvailable() {
144    return avail;
145  }
146
147  protected synchronized long getTimeUnitInMillis() {
148    return tunit;
149  }
150
151  /**
152   * Is there at least one resource available to allow execution?
153   * @return true if there is at least one resource available, otherwise false
154   */
155  public boolean canExecute() {
156    return canExecute(1);
157  }
158
159  /**
160   * Are there enough available resources to allow execution?
161   * @param amount the number of required resources, a non-negative number
162   * @return true if there are enough available resources, otherwise false
163   */
164  public synchronized boolean canExecute(final long amount) {
165    if (isBypass()) {
166      return true;
167    }
168
169    long refillAmount = refill(limit);
170    if (refillAmount == 0 && avail < amount) {
171      return false;
172    }
173    // check for positive overflow
174    if (avail <= Long.MAX_VALUE - refillAmount) {
175      avail = Math.max(0, Math.min(avail + refillAmount, limit));
176    } else {
177      avail = Math.max(0, limit);
178    }
179    if (avail >= amount) {
180      return true;
181    }
182    return false;
183  }
184
185  /**
186   * consume one available unit.
187   */
188  public void consume() {
189    consume(1);
190  }
191
192  /**
193   * consume amount available units, amount could be a negative number
194   * @param amount the number of units to consume
195   */
196  public synchronized void consume(final long amount) {
197
198    if (isBypass()) {
199      return;
200    }
201
202    if (amount >= 0 ) {
203      this.avail -= amount;
204      if (this.avail < 0) {
205        this.avail = 0;
206      }
207    } else {
208      if (this.avail <= Long.MAX_VALUE + amount) {
209        this.avail -= amount;
210        this.avail = Math.min(this.avail, this.limit);
211      } else {
212        this.avail = this.limit;
213      }
214    }
215  }
216
217  /**
218   * @return estimate of the ms required to wait before being able to provide 1 resource.
219   */
220  public long waitInterval() {
221    return waitInterval(1);
222  }
223
224  /**
225   * @return estimate of the ms required to wait before being able to provide "amount" resources.
226   */
227  public synchronized long waitInterval(final long amount) {
228    // TODO Handle over quota?
229    return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
230  }
231
232  // These two method are for strictly testing purpose only
233  @VisibleForTesting
234  public abstract void setNextRefillTime(long nextRefillTime);
235
236  @VisibleForTesting
237  public abstract long getNextRefillTime();
238}