001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.concurrent.TimeUnit; 021import org.apache.yetus.audience.InterfaceAudience; 022import org.apache.yetus.audience.InterfaceStability; 023 024/** 025 * Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter 026 * RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter(); 027 * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0) 028 * { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the 029 * work and consume the resource... limiter.consume(); 030 */ 031@InterfaceAudience.Private 032@InterfaceStability.Evolving 033@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 034 justification = "FindBugs seems confused; says limit and tlimit " 035 + "are mostly synchronized...but to me it looks like they are totally synchronized") 036public abstract class RateLimiter { 037 public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter"; 038 public static final long DEFAULT_TIME_UNIT = 1000; 039 040 private long tunit = DEFAULT_TIME_UNIT; // Timeunit factor for translating to ms. 041 private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. 042 private long avail = Long.MAX_VALUE; // Currently available resource units 043 044 /** 045 * Refill the available units w.r.t the elapsed time. 046 * @param limit Maximum available resource units that can be refilled to. 047 * @return how many resource units may be refilled ? 048 */ 049 abstract long refill(long limit); 050 051 /** 052 * Time in milliseconds to wait for before requesting to consume 'amount' resource. 053 * @param limit Maximum available resource units that can be refilled to. 054 * @param available Currently available resource units 055 * @param amount Resources for which time interval to calculate for 056 * @return estimate of the ms required to wait before being able to provide 'amount' resources. 057 */ 058 abstract long getWaitInterval(long limit, long available, long amount); 059 060 /** 061 * Set the RateLimiter max available resources and refill period. 062 * @param limit The max value available resource units can be refilled to. 063 * @param timeUnit Timeunit factor for translating to ms. 064 */ 065 public synchronized void set(final long limit, final TimeUnit timeUnit) { 066 switch (timeUnit) { 067 case MILLISECONDS: 068 tunit = 1; 069 break; 070 case SECONDS: 071 tunit = 1000; 072 break; 073 case MINUTES: 074 tunit = 60 * 1000; 075 break; 076 case HOURS: 077 tunit = 60 * 60 * 1000; 078 break; 079 case DAYS: 080 tunit = 24 * 60 * 60 * 1000; 081 break; 082 default: 083 throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit."); 084 } 085 this.limit = limit; 086 this.avail = limit; 087 } 088 089 @Override 090 public String toString() { 091 String rateLimiter = this.getClass().getSimpleName(); 092 if (getLimit() == Long.MAX_VALUE) { 093 return rateLimiter + "(Bypass)"; 094 } 095 return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() + " tunit=" 096 + getTimeUnitInMillis() + ")"; 097 } 098 099 /** 100 * Sets the current instance of RateLimiter to a new values. if current limit is smaller than the 101 * new limit, bump up the available resources. Otherwise allow clients to use up the previously 102 * available resources. 103 */ 104 public synchronized void update(final RateLimiter other) { 105 this.tunit = other.tunit; 106 if (this.limit < other.limit) { 107 // If avail is capped to this.limit, it will never overflow, 108 // otherwise, avail may overflow, just be careful here. 109 long diff = other.limit - this.limit; 110 if (this.avail <= Long.MAX_VALUE - diff) { 111 this.avail += diff; 112 this.avail = Math.min(this.avail, other.limit); 113 } else { 114 this.avail = other.limit; 115 } 116 } 117 this.limit = other.limit; 118 } 119 120 public synchronized boolean isBypass() { 121 return getLimit() == Long.MAX_VALUE; 122 } 123 124 public synchronized long getLimit() { 125 return limit; 126 } 127 128 public synchronized long getAvailable() { 129 return avail; 130 } 131 132 protected synchronized long getTimeUnitInMillis() { 133 return tunit; 134 } 135 136 /** 137 * Is there at least one resource available to allow execution? 138 * @return the waitInterval to backoff, or 0 if execution is allowed 139 */ 140 public long getWaitIntervalMs() { 141 return getWaitIntervalMs(1); 142 } 143 144 /** 145 * Are there enough available resources to allow execution? 146 * @param amount the number of required resources, a non-negative number 147 * @return the waitInterval to backoff, or 0 if execution is allowed 148 */ 149 public synchronized long getWaitIntervalMs(final long amount) { 150 assert amount >= 0; 151 if (!isAvailable(amount)) { 152 return waitInterval(amount); 153 } 154 return 0; 155 } 156 157 /** 158 * Are there enough available resources to allow execution? 159 * @param amount the number of required resources, a non-negative number 160 * @return true if there are enough available resources, otherwise false 161 */ 162 protected boolean isAvailable(final long amount) { 163 if (isBypass()) { 164 return true; 165 } 166 167 long refillAmount = refill(limit); 168 if (refillAmount == 0 && avail < amount) { 169 return false; 170 } 171 // check for positive overflow 172 if (avail <= Long.MAX_VALUE - refillAmount) { 173 avail = Math.min(avail + refillAmount, limit); 174 } else { 175 avail = limit; 176 } 177 if (avail >= amount) { 178 return true; 179 } 180 return false; 181 } 182 183 /** 184 * consume one available unit. 185 */ 186 public void consume() { 187 consume(1); 188 } 189 190 /** 191 * consume amount available units, amount could be a negative number 192 * @param amount the number of units to consume 193 */ 194 public synchronized void consume(final long amount) { 195 196 if (isBypass()) { 197 return; 198 } 199 200 if (amount >= 0) { 201 this.avail -= amount; 202 } else { 203 if (this.avail <= Long.MAX_VALUE + amount) { 204 this.avail -= amount; 205 this.avail = Math.min(this.avail, this.limit); 206 } else { 207 this.avail = this.limit; 208 } 209 } 210 } 211 212 /** Returns estimate of the ms required to wait before being able to provide 1 resource. */ 213 public long waitInterval() { 214 return waitInterval(1); 215 } 216 217 /** 218 * Returns estimate of the ms required to wait before being able to provide "amount" resources. 219 */ 220 public synchronized long waitInterval(final long amount) { 221 // TODO Handle over quota? 222 return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount); 223 } 224 225 // These two method are for strictly testing purpose only 226 227 public abstract void setNextRefillTime(long nextRefillTime); 228 229 public abstract long getNextRefillTime(); 230}