001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.util.concurrent.TimeUnit;
021import org.apache.yetus.audience.InterfaceAudience;
022import org.apache.yetus.audience.InterfaceStability;
023
024/**
025 * Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter
026 * RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter();
027 * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute
028 * before performing resource consuming operation bool canExecute = limiter.canExecute(); // If
029 * there are no available resources, wait until one is available if (!canExecute)
030 * Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource...
031 * limiter.consume(); }
032 */
033@InterfaceAudience.Private
034@InterfaceStability.Evolving
035@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
036    justification = "FindBugs seems confused; says limit and tlimit "
037      + "are mostly synchronized...but to me it looks like they are totally synchronized")
038public abstract class RateLimiter {
039  public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
040  private long tunit = 1000; // Timeunit factor for translating to ms.
041  private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
042  private long avail = Long.MAX_VALUE; // Currently available resource units
043
044  /**
045   * Refill the available units w.r.t the elapsed time.
046   * @param limit Maximum available resource units that can be refilled to.
047   * @return how many resource units may be refilled ?
048   */
049  abstract long refill(long limit);
050
051  /**
052   * Time in milliseconds to wait for before requesting to consume 'amount' resource.
053   * @param limit     Maximum available resource units that can be refilled to.
054   * @param available Currently available resource units
055   * @param amount    Resources for which time interval to calculate for
056   * @return estimate of the ms required to wait before being able to provide 'amount' resources.
057   */
058  abstract long getWaitInterval(long limit, long available, long amount);
059
060  /**
061   * Set the RateLimiter max available resources and refill period.
062   * @param limit    The max value available resource units can be refilled to.
063   * @param timeUnit Timeunit factor for translating to ms.
064   */
065  public synchronized void set(final long limit, final TimeUnit timeUnit) {
066    switch (timeUnit) {
067      case MILLISECONDS:
068        tunit = 1;
069        break;
070      case SECONDS:
071        tunit = 1000;
072        break;
073      case MINUTES:
074        tunit = 60 * 1000;
075        break;
076      case HOURS:
077        tunit = 60 * 60 * 1000;
078        break;
079      case DAYS:
080        tunit = 24 * 60 * 60 * 1000;
081        break;
082      default:
083        throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
084    }
085    this.limit = limit;
086    this.avail = limit;
087  }
088
089  @Override
090  public String toString() {
091    String rateLimiter = this.getClass().getSimpleName();
092    if (getLimit() == Long.MAX_VALUE) {
093      return rateLimiter + "(Bypass)";
094    }
095    return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() + " tunit="
096      + getTimeUnitInMillis() + ")";
097  }
098
099  /**
100   * Sets the current instance of RateLimiter to a new values. if current limit is smaller than the
101   * new limit, bump up the available resources. Otherwise allow clients to use up the previously
102   * available resources.
103   */
104  public synchronized void update(final RateLimiter other) {
105    this.tunit = other.tunit;
106    if (this.limit < other.limit) {
107      // If avail is capped to this.limit, it will never overflow,
108      // otherwise, avail may overflow, just be careful here.
109      long diff = other.limit - this.limit;
110      if (this.avail <= Long.MAX_VALUE - diff) {
111        this.avail += diff;
112        this.avail = Math.min(this.avail, other.limit);
113      } else {
114        this.avail = other.limit;
115      }
116    }
117    this.limit = other.limit;
118  }
119
120  public synchronized boolean isBypass() {
121    return getLimit() == Long.MAX_VALUE;
122  }
123
124  public synchronized long getLimit() {
125    return limit;
126  }
127
128  public synchronized long getAvailable() {
129    return avail;
130  }
131
132  protected synchronized long getTimeUnitInMillis() {
133    return tunit;
134  }
135
136  /**
137   * Is there at least one resource available to allow execution?
138   * @return true if there is at least one resource available, otherwise false
139   */
140  public boolean canExecute() {
141    return canExecute(1);
142  }
143
144  /**
145   * Are there enough available resources to allow execution?
146   * @param amount the number of required resources, a non-negative number
147   * @return true if there are enough available resources, otherwise false
148   */
149  public synchronized boolean canExecute(final long amount) {
150    if (isBypass()) {
151      return true;
152    }
153
154    long refillAmount = refill(limit);
155    if (refillAmount == 0 && avail < amount) {
156      return false;
157    }
158    // check for positive overflow
159    if (avail <= Long.MAX_VALUE - refillAmount) {
160      avail = Math.max(0, Math.min(avail + refillAmount, limit));
161    } else {
162      avail = Math.max(0, limit);
163    }
164    if (avail >= amount) {
165      return true;
166    }
167    return false;
168  }
169
170  /**
171   * consume one available unit.
172   */
173  public void consume() {
174    consume(1);
175  }
176
177  /**
178   * consume amount available units, amount could be a negative number
179   * @param amount the number of units to consume
180   */
181  public synchronized void consume(final long amount) {
182
183    if (isBypass()) {
184      return;
185    }
186
187    if (amount >= 0) {
188      this.avail -= amount;
189      if (this.avail < 0) {
190        this.avail = 0;
191      }
192    } else {
193      if (this.avail <= Long.MAX_VALUE + amount) {
194        this.avail -= amount;
195        this.avail = Math.min(this.avail, this.limit);
196      } else {
197        this.avail = this.limit;
198      }
199    }
200  }
201
202  /** Returns estimate of the ms required to wait before being able to provide 1 resource. */
203  public long waitInterval() {
204    return waitInterval(1);
205  }
206
207  /**
208   * Returns estimate of the ms required to wait before being able to provide "amount" resources.
209   */
210  public synchronized long waitInterval(final long amount) {
211    // TODO Handle over quota?
212    return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
213  }
214
215  // These two method are for strictly testing purpose only
216
217  public abstract void setNextRefillTime(long nextRefillTime);
218
219  public abstract long getNextRefillTime();
220}