001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.util.concurrent.TimeUnit;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.yetus.audience.InterfaceStability;
025
026/**
027 * Simple rate limiter.
028 *
029 * Usage Example:
030 *    // At this point you have a unlimited resource limiter
031 *   RateLimiter limiter = new AverageIntervalRateLimiter();
032 *                         or new FixedIntervalRateLimiter();
033 *   limiter.set(10, TimeUnit.SECONDS);       // set 10 resources/sec
034 *
035 *   while (true) {
036 *     // call canExecute before performing resource consuming operation
037 *     bool canExecute = limiter.canExecute();
038 *     // If there are no available resources, wait until one is available
039 *     if (!canExecute) Thread.sleep(limiter.waitInterval());
040 *     // ...execute the work and consume the resource...
041 *     limiter.consume();
042 *   }
043 */
044@InterfaceAudience.Private
045@InterfaceStability.Evolving
046@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
047  justification="FindBugs seems confused; says limit and tlimit " +
048  "are mostly synchronized...but to me it looks like they are totally synchronized")
049public abstract class RateLimiter {
050  public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
051  private long tunit = 1000;           // Timeunit factor for translating to ms.
052  private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
053  private long avail = Long.MAX_VALUE; // Currently available resource units
054
055  /**
056   * Refill the available units w.r.t the elapsed time.
057   * @param limit Maximum available resource units that can be refilled to.
058   * @return how many resource units may be refilled ?
059   */
060  abstract long refill(long limit);
061
062  /**
063   * Time in milliseconds to wait for before requesting to consume 'amount' resource.
064   * @param limit Maximum available resource units that can be refilled to.
065   * @param available Currently available resource units
066   * @param amount Resources for which time interval to calculate for
067   * @return estimate of the ms required to wait before being able to provide 'amount' resources.
068   */
069  abstract long getWaitInterval(long limit, long available, long amount);
070
071
072  /**
073   * Set the RateLimiter max available resources and refill period.
074   * @param limit The max value available resource units can be refilled to.
075   * @param timeUnit Timeunit factor for translating to ms.
076   */
077  public synchronized void set(final long limit, final TimeUnit timeUnit) {
078    switch (timeUnit) {
079    case MILLISECONDS:
080      tunit = 1;
081      break;
082    case SECONDS:
083      tunit = 1000;
084      break;
085    case MINUTES:
086      tunit = 60 * 1000;
087      break;
088    case HOURS:
089      tunit = 60 * 60 * 1000;
090      break;
091    case DAYS:
092      tunit = 24 * 60 * 60 * 1000;
093      break;
094    default:
095      throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
096    }
097    this.limit = limit;
098    this.avail = limit;
099  }
100
101  @Override
102  public String toString() {
103    String rateLimiter = this.getClass().getSimpleName();
104    if (getLimit() == Long.MAX_VALUE) {
105      return rateLimiter + "(Bypass)";
106    }
107    return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() +
108        " tunit=" + getTimeUnitInMillis() + ")";
109  }
110
111  /**
112   * Sets the current instance of RateLimiter to a new values.
113   *
114   * if current limit is smaller than the new limit, bump up the available resources.
115   * Otherwise allow clients to use up the previously available resources.
116   */
117  public synchronized void update(final RateLimiter other) {
118    this.tunit = other.tunit;
119    if (this.limit < other.limit) {
120      // If avail is capped to this.limit, it will never overflow,
121      // otherwise, avail may overflow, just be careful here.
122      long diff = other.limit - this.limit;
123      if (this.avail <= Long.MAX_VALUE - diff) {
124        this.avail += diff;
125        this.avail = Math.min(this.avail, other.limit);
126      } else {
127        this.avail = other.limit;
128      }
129    }
130    this.limit = other.limit;
131  }
132
133  public synchronized boolean isBypass() {
134    return getLimit() == Long.MAX_VALUE;
135  }
136
137  public synchronized long getLimit() {
138    return limit;
139  }
140
141  public synchronized long getAvailable() {
142    return avail;
143  }
144
145  protected synchronized long getTimeUnitInMillis() {
146    return tunit;
147  }
148
149  /**
150   * Is there at least one resource available to allow execution?
151   * @return true if there is at least one resource available, otherwise false
152   */
153  public boolean canExecute() {
154    return canExecute(1);
155  }
156
157  /**
158   * Are there enough available resources to allow execution?
159   * @param amount the number of required resources, a non-negative number
160   * @return true if there are enough available resources, otherwise false
161   */
162  public synchronized boolean canExecute(final long amount) {
163    if (isBypass()) {
164      return true;
165    }
166
167    long refillAmount = refill(limit);
168    if (refillAmount == 0 && avail < amount) {
169      return false;
170    }
171    // check for positive overflow
172    if (avail <= Long.MAX_VALUE - refillAmount) {
173      avail = Math.max(0, Math.min(avail + refillAmount, limit));
174    } else {
175      avail = Math.max(0, limit);
176    }
177    if (avail >= amount) {
178      return true;
179    }
180    return false;
181  }
182
183  /**
184   * consume one available unit.
185   */
186  public void consume() {
187    consume(1);
188  }
189
190  /**
191   * consume amount available units, amount could be a negative number
192   * @param amount the number of units to consume
193   */
194  public synchronized void consume(final long amount) {
195
196    if (isBypass()) {
197      return;
198    }
199
200    if (amount >= 0 ) {
201      this.avail -= amount;
202      if (this.avail < 0) {
203        this.avail = 0;
204      }
205    } else {
206      if (this.avail <= Long.MAX_VALUE + amount) {
207        this.avail -= amount;
208        this.avail = Math.min(this.avail, this.limit);
209      } else {
210        this.avail = this.limit;
211      }
212    }
213  }
214
215  /**
216   * @return estimate of the ms required to wait before being able to provide 1 resource.
217   */
218  public long waitInterval() {
219    return waitInterval(1);
220  }
221
222  /**
223   * @return estimate of the ms required to wait before being able to provide "amount" resources.
224   */
225  public synchronized long waitInterval(final long amount) {
226    // TODO Handle over quota?
227    return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
228  }
229
230  // These two method are for strictly testing purpose only
231
232  public abstract void setNextRefillTime(long nextRefillTime);
233
234  public abstract long getNextRefillTime();
235}