1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3 * agreements. See the NOTICE file distributed with this work for additional information regarding
4 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9 * for the specific language governing permissions and limitations under the License.
10 */
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.util.concurrent.TimeUnit;
15
16 import org.apache.hadoop.hbase.classification.InterfaceAudience;
17 import org.apache.hadoop.hbase.classification.InterfaceStability;
18
19 import com.google.common.annotations.VisibleForTesting;
20
21 /**
22 * Simple rate limiter.
23 *
24 * Usage Example:
25 * // At this point you have a unlimited resource limiter
26 * RateLimiter limiter = new AverageIntervalRateLimiter();
27 * or new FixedIntervalRateLimiter();
28 * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec
29 *
30 * while (true) {
31 * // call canExecute before performing resource consuming operation
32 * bool canExecute = limiter.canExecute();
33 * // If there are no available resources, wait until one is available
34 * if (!canExecute) Thread.sleep(limiter.waitInterval());
35 * // ...execute the work and consume the resource...
36 * limiter.consume();
37 * }
38 */
39 @InterfaceAudience.Private
40 @InterfaceStability.Evolving
41 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
42 justification="FindBugs seems confused; says limit and tlimit " +
43 "are mostly synchronized...but to me it looks like they are totally synchronized")
44 public abstract class RateLimiter {
45 public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
46 private long tunit = 1000; // Timeunit factor for translating to ms.
47 private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
48 private long avail = Long.MAX_VALUE; // Currently available resource units
49
50 /**
51 * Refill the available units w.r.t the elapsed time.
52 * @param limit Maximum available resource units that can be refilled to.
53 * @return how many resource units may be refilled ?
54 */
55 abstract long refill(long limit);
56
57 /**
58 * Time in milliseconds to wait for before requesting to consume 'amount' resource.
59 * @param limit Maximum available resource units that can be refilled to.
60 * @param available Currently available resource units
61 * @param amount Resources for which time interval to calculate for
62 * @return estimate of the ms required to wait before being able to provide 'amount' resources.
63 */
64 abstract long getWaitInterval(long limit, long available, long amount);
65
66
67 /**
68 * Set the RateLimiter max available resources and refill period.
69 * @param limit The max value available resource units can be refilled to.
70 * @param timeUnit Timeunit factor for translating to ms.
71 */
72 public synchronized void set(final long limit, final TimeUnit timeUnit) {
73 switch (timeUnit) {
74 case MILLISECONDS:
75 tunit = 1;
76 break;
77 case SECONDS:
78 tunit = 1000;
79 break;
80 case MINUTES:
81 tunit = 60 * 1000;
82 break;
83 case HOURS:
84 tunit = 60 * 60 * 1000;
85 break;
86 case DAYS:
87 tunit = 24 * 60 * 60 * 1000;
88 break;
89 default:
90 throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
91 }
92 this.limit = limit;
93 this.avail = limit;
94 }
95
96 public String toString() {
97 String rateLimiter = this.getClass().getSimpleName();
98 if (getLimit() == Long.MAX_VALUE) {
99 return rateLimiter + "(Bypass)";
100 }
101 return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() +
102 " tunit=" + getTimeUnitInMillis() + ")";
103 }
104
105 /**
106 * Sets the current instance of RateLimiter to a new values.
107 *
108 * if current limit is smaller than the new limit, bump up the available resources.
109 * Otherwise allow clients to use up the previously available resources.
110 */
111 public synchronized void update(final RateLimiter other) {
112 this.tunit = other.tunit;
113 if (this.limit < other.limit) {
114 // If avail is capped to this.limit, it will never overflow,
115 // otherwise, avail may overflow, just be careful here.
116 long diff = other.limit - this.limit;
117 if (this.avail <= Long.MAX_VALUE - diff) {
118 this.avail += diff;
119 this.avail = Math.min(this.avail, other.limit);
120 } else {
121 this.avail = other.limit;
122 }
123 }
124 this.limit = other.limit;
125 }
126
127 public synchronized boolean isBypass() {
128 return getLimit() == Long.MAX_VALUE;
129 }
130
131 public synchronized long getLimit() {
132 return limit;
133 }
134
135 public synchronized long getAvailable() {
136 return avail;
137 }
138
139 protected synchronized long getTimeUnitInMillis() {
140 return tunit;
141 }
142
143 /**
144 * Is there at least one resource available to allow execution?
145 * @return true if there is at least one resource available, otherwise false
146 */
147 public boolean canExecute() {
148 return canExecute(1);
149 }
150
151 /**
152 * Are there enough available resources to allow execution?
153 * @param amount the number of required resources, a non-negative number
154 * @return true if there are enough available resources, otherwise false
155 */
156 public synchronized boolean canExecute(final long amount) {
157 if (isBypass()) {
158 return true;
159 }
160
161 long refillAmount = refill(limit);
162 if (refillAmount == 0 && avail < amount) {
163 return false;
164 }
165 // check for positive overflow
166 if (avail <= Long.MAX_VALUE - refillAmount) {
167 avail = Math.max(0, Math.min(avail + refillAmount, limit));
168 } else {
169 avail = Math.max(0, limit);
170 }
171 if (avail >= amount) {
172 return true;
173 }
174 return false;
175 }
176
177 /**
178 * consume one available unit.
179 */
180 public void consume() {
181 consume(1);
182 }
183
184 /**
185 * consume amount available units, amount could be a negative number
186 * @param amount the number of units to consume
187 */
188 public synchronized void consume(final long amount) {
189
190 if (isBypass()) {
191 return;
192 }
193
194 if (amount >= 0 ) {
195 this.avail -= amount;
196 if (this.avail < 0) {
197 this.avail = 0;
198 }
199 } else {
200 if (this.avail <= Long.MAX_VALUE + amount) {
201 this.avail -= amount;
202 this.avail = Math.min(this.avail, this.limit);
203 } else {
204 this.avail = this.limit;
205 }
206 }
207 }
208
209 /**
210 * @return estimate of the ms required to wait before being able to provide 1 resource.
211 */
212 public long waitInterval() {
213 return waitInterval(1);
214 }
215
216 /**
217 * @return estimate of the ms required to wait before being able to provide "amount" resources.
218 */
219 public synchronized long waitInterval(final long amount) {
220 // TODO Handle over quota?
221 return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
222 }
223
224 // These two method are for strictly testing purpose only
225 @VisibleForTesting
226 public abstract void setNextRefillTime(long nextRefillTime);
227
228 @VisibleForTesting
229 public abstract long getNextRefillTime();
230 }