001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.util.concurrent.TimeUnit; 022 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.yetus.audience.InterfaceStability; 025 026import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 027 028/** 029 * Simple rate limiter. 030 * 031 * Usage Example: 032 * // At this point you have a unlimited resource limiter 033 * RateLimiter limiter = new AverageIntervalRateLimiter(); 034 * or new FixedIntervalRateLimiter(); 035 * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec 036 * 037 * while (true) { 038 * // call canExecute before performing resource consuming operation 039 * bool canExecute = limiter.canExecute(); 040 * // If there are no available resources, wait until one is available 041 * if (!canExecute) Thread.sleep(limiter.waitInterval()); 042 * // ...execute the work and consume the resource... 043 * limiter.consume(); 044 * } 045 */ 046@InterfaceAudience.Private 047@InterfaceStability.Evolving 048@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 049 justification="FindBugs seems confused; says limit and tlimit " + 050 "are mostly synchronized...but to me it looks like they are totally synchronized") 051public abstract class RateLimiter { 052 public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter"; 053 private long tunit = 1000; // Timeunit factor for translating to ms. 054 private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. 055 private long avail = Long.MAX_VALUE; // Currently available resource units 056 057 /** 058 * Refill the available units w.r.t the elapsed time. 059 * @param limit Maximum available resource units that can be refilled to. 060 * @return how many resource units may be refilled ? 061 */ 062 abstract long refill(long limit); 063 064 /** 065 * Time in milliseconds to wait for before requesting to consume 'amount' resource. 066 * @param limit Maximum available resource units that can be refilled to. 067 * @param available Currently available resource units 068 * @param amount Resources for which time interval to calculate for 069 * @return estimate of the ms required to wait before being able to provide 'amount' resources. 070 */ 071 abstract long getWaitInterval(long limit, long available, long amount); 072 073 074 /** 075 * Set the RateLimiter max available resources and refill period. 076 * @param limit The max value available resource units can be refilled to. 077 * @param timeUnit Timeunit factor for translating to ms. 078 */ 079 public synchronized void set(final long limit, final TimeUnit timeUnit) { 080 switch (timeUnit) { 081 case MILLISECONDS: 082 tunit = 1; 083 break; 084 case SECONDS: 085 tunit = 1000; 086 break; 087 case MINUTES: 088 tunit = 60 * 1000; 089 break; 090 case HOURS: 091 tunit = 60 * 60 * 1000; 092 break; 093 case DAYS: 094 tunit = 24 * 60 * 60 * 1000; 095 break; 096 default: 097 throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit."); 098 } 099 this.limit = limit; 100 this.avail = limit; 101 } 102 103 @Override 104 public String toString() { 105 String rateLimiter = this.getClass().getSimpleName(); 106 if (getLimit() == Long.MAX_VALUE) { 107 return rateLimiter + "(Bypass)"; 108 } 109 return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() + 110 " tunit=" + getTimeUnitInMillis() + ")"; 111 } 112 113 /** 114 * Sets the current instance of RateLimiter to a new values. 115 * 116 * if current limit is smaller than the new limit, bump up the available resources. 117 * Otherwise allow clients to use up the previously available resources. 118 */ 119 public synchronized void update(final RateLimiter other) { 120 this.tunit = other.tunit; 121 if (this.limit < other.limit) { 122 // If avail is capped to this.limit, it will never overflow, 123 // otherwise, avail may overflow, just be careful here. 124 long diff = other.limit - this.limit; 125 if (this.avail <= Long.MAX_VALUE - diff) { 126 this.avail += diff; 127 this.avail = Math.min(this.avail, other.limit); 128 } else { 129 this.avail = other.limit; 130 } 131 } 132 this.limit = other.limit; 133 } 134 135 public synchronized boolean isBypass() { 136 return getLimit() == Long.MAX_VALUE; 137 } 138 139 public synchronized long getLimit() { 140 return limit; 141 } 142 143 public synchronized long getAvailable() { 144 return avail; 145 } 146 147 protected synchronized long getTimeUnitInMillis() { 148 return tunit; 149 } 150 151 /** 152 * Is there at least one resource available to allow execution? 153 * @return true if there is at least one resource available, otherwise false 154 */ 155 public boolean canExecute() { 156 return canExecute(1); 157 } 158 159 /** 160 * Are there enough available resources to allow execution? 161 * @param amount the number of required resources, a non-negative number 162 * @return true if there are enough available resources, otherwise false 163 */ 164 public synchronized boolean canExecute(final long amount) { 165 if (isBypass()) { 166 return true; 167 } 168 169 long refillAmount = refill(limit); 170 if (refillAmount == 0 && avail < amount) { 171 return false; 172 } 173 // check for positive overflow 174 if (avail <= Long.MAX_VALUE - refillAmount) { 175 avail = Math.max(0, Math.min(avail + refillAmount, limit)); 176 } else { 177 avail = Math.max(0, limit); 178 } 179 if (avail >= amount) { 180 return true; 181 } 182 return false; 183 } 184 185 /** 186 * consume one available unit. 187 */ 188 public void consume() { 189 consume(1); 190 } 191 192 /** 193 * consume amount available units, amount could be a negative number 194 * @param amount the number of units to consume 195 */ 196 public synchronized void consume(final long amount) { 197 198 if (isBypass()) { 199 return; 200 } 201 202 if (amount >= 0 ) { 203 this.avail -= amount; 204 if (this.avail < 0) { 205 this.avail = 0; 206 } 207 } else { 208 if (this.avail <= Long.MAX_VALUE + amount) { 209 this.avail -= amount; 210 this.avail = Math.min(this.avail, this.limit); 211 } else { 212 this.avail = this.limit; 213 } 214 } 215 } 216 217 /** 218 * @return estimate of the ms required to wait before being able to provide 1 resource. 219 */ 220 public long waitInterval() { 221 return waitInterval(1); 222 } 223 224 /** 225 * @return estimate of the ms required to wait before being able to provide "amount" resources. 226 */ 227 public synchronized long waitInterval(final long amount) { 228 // TODO Handle over quota? 229 return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount); 230 } 231 232 // These two method are for strictly testing purpose only 233 @VisibleForTesting 234 public abstract void setNextRefillTime(long nextRefillTime); 235 236 @VisibleForTesting 237 public abstract long getNextRefillTime(); 238}