View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  
12  package org.apache.hadoop.hbase.quotas;
13  
14  import java.util.concurrent.TimeUnit;
15  
16  import org.apache.hadoop.hbase.classification.InterfaceAudience;
17  import org.apache.hadoop.hbase.classification.InterfaceStability;
18  
19  import com.google.common.annotations.VisibleForTesting;
20  
21  /**
22   * Simple rate limiter.
23   *
24   * Usage Example:
25   *    // At this point you have a unlimited resource limiter
26   *   RateLimiter limiter = new AverageIntervalRateLimiter();
27   *                         or new FixedIntervalRateLimiter();
28   *   limiter.set(10, TimeUnit.SECONDS);       // set 10 resources/sec
29   *
30   *   while (true) {
31   *     // call canExecute before performing resource consuming operation
32   *     bool canExecute = limiter.canExecute();
33   *     // If there are no available resources, wait until one is available
34   *     if (!canExecute) Thread.sleep(limiter.waitInterval());
35   *     // ...execute the work and consume the resource...
36   *     limiter.consume();
37   *   }
38   */
39  @InterfaceAudience.Private
40  @InterfaceStability.Evolving
41  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
42    justification="FindBugs seems confused; says limit and tlimit " +
43    "are mostly synchronized...but to me it looks like they are totally synchronized")
44  public abstract class RateLimiter {
45    public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
46    private long tunit = 1000;           // Timeunit factor for translating to ms.
47    private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
48    private long avail = Long.MAX_VALUE; // Currently available resource units
49  
50    /**
51     * Refill the available units w.r.t the elapsed time.
52     * @param limit Maximum available resource units that can be refilled to.
53     * @return how many resource units may be refilled ?
54     */
55    abstract long refill(long limit);
56  
57    /**
58     * Time in milliseconds to wait for before requesting to consume 'amount' resource.
59     * @param limit Maximum available resource units that can be refilled to.
60     * @param available Currently available resource units
61     * @param amount Resources for which time interval to calculate for
62     * @return estimate of the ms required to wait before being able to provide 'amount' resources.
63     */
64    abstract long getWaitInterval(long limit, long available, long amount);
65  
66  
67    /**
68     * Set the RateLimiter max available resources and refill period.
69     * @param limit The max value available resource units can be refilled to.
70     * @param timeUnit Timeunit factor for translating to ms.
71     */
72    public synchronized void set(final long limit, final TimeUnit timeUnit) {
73      switch (timeUnit) {
74      case MILLISECONDS:
75        tunit = 1;
76        break;
77      case SECONDS:
78        tunit = 1000;
79        break;
80      case MINUTES:
81        tunit = 60 * 1000;
82        break;
83      case HOURS:
84        tunit = 60 * 60 * 1000;
85        break;
86      case DAYS:
87        tunit = 24 * 60 * 60 * 1000;
88        break;
89      default:
90        throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
91      }
92      this.limit = limit;
93      this.avail = limit;
94    }
95  
96    public String toString() {
97      String rateLimiter = this.getClass().getSimpleName();
98      if (getLimit() == Long.MAX_VALUE) {
99        return rateLimiter + "(Bypass)";
100     }
101     return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() +
102         " tunit=" + getTimeUnitInMillis() + ")";
103   }
104 
105   /**
106    * Sets the current instance of RateLimiter to a new values.
107    *
108    * if current limit is smaller than the new limit, bump up the available resources.
109    * Otherwise allow clients to use up the previously available resources.
110    */
111   public synchronized void update(final RateLimiter other) {
112     this.tunit = other.tunit;
113     if (this.limit < other.limit) {
114       // If avail is capped to this.limit, it will never overflow,
115       // otherwise, avail may overflow, just be careful here.
116       long diff = other.limit - this.limit;
117       if (this.avail <= Long.MAX_VALUE - diff) {
118         this.avail += diff;
119         this.avail = Math.min(this.avail, other.limit);
120       } else {
121         this.avail = other.limit;
122       }
123     }
124     this.limit = other.limit;
125   }
126 
127   public synchronized boolean isBypass() {
128     return getLimit() == Long.MAX_VALUE;
129   }
130 
131   public synchronized long getLimit() {
132     return limit;
133   }
134 
135   public synchronized long getAvailable() {
136     return avail;
137   }
138 
139   protected synchronized long getTimeUnitInMillis() {
140     return tunit;
141   }
142 
143   /**
144    * Is there at least one resource available to allow execution?
145    * @return true if there is at least one resource available, otherwise false
146    */
147   public boolean canExecute() {
148     return canExecute(1);
149   }
150 
151   /**
152    * Are there enough available resources to allow execution?
153    * @param amount the number of required resources, a non-negative number
154    * @return true if there are enough available resources, otherwise false
155    */
156   public synchronized boolean canExecute(final long amount) {
157     if (isBypass()) {
158       return true;
159     }
160 
161     long refillAmount = refill(limit);
162     if (refillAmount == 0 && avail < amount) {
163       return false;
164     }
165     // check for positive overflow
166     if (avail <= Long.MAX_VALUE - refillAmount) {
167       avail = Math.max(0, Math.min(avail + refillAmount, limit));
168     } else {
169       avail = Math.max(0, limit);
170     }
171     if (avail >= amount) {
172       return true;
173     }
174     return false;
175   }
176 
177   /**
178    * consume one available unit.
179    */
180   public void consume() {
181     consume(1);
182   }
183 
184   /**
185    * consume amount available units, amount could be a negative number
186    * @param amount the number of units to consume
187    */
188   public synchronized void consume(final long amount) {
189 
190     if (isBypass()) {
191       return;
192     }
193 
194     if (amount >= 0 ) {
195       this.avail -= amount;
196       if (this.avail < 0) {
197         this.avail = 0;
198       }
199     } else {
200       if (this.avail <= Long.MAX_VALUE + amount) {
201         this.avail -= amount;
202         this.avail = Math.min(this.avail, this.limit);
203       } else {
204         this.avail = this.limit;
205       }
206     }
207   }
208 
209   /**
210    * @return estimate of the ms required to wait before being able to provide 1 resource.
211    */
212   public long waitInterval() {
213     return waitInterval(1);
214   }
215 
216   /**
217    * @return estimate of the ms required to wait before being able to provide "amount" resources.
218    */
219   public synchronized long waitInterval(final long amount) {
220     // TODO Handle over quota?
221     return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
222   }
223 
224   // These two method are for strictly testing purpose only
225   @VisibleForTesting
226   public abstract void setNextRefillTime(long nextRefillTime);
227 
228   @VisibleForTesting
229   public abstract long getNextRefillTime();
230 }