001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.yetus.audience.InterfaceAudience;
022import org.apache.yetus.audience.InterfaceStability;
023
024import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
025import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
026import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
027
028/**
029 * Simple time based limiter that checks the quota Throttle
030 */
031@InterfaceAudience.Private
032@InterfaceStability.Evolving
033public class TimeBasedLimiter implements QuotaLimiter {
034  private RateLimiter reqsLimiter = null;
035  private RateLimiter reqSizeLimiter = null;
036  private RateLimiter writeReqsLimiter = null;
037  private RateLimiter writeSizeLimiter = null;
038  private RateLimiter readReqsLimiter = null;
039  private RateLimiter readSizeLimiter = null;
040  private RateLimiter reqCapacityUnitLimiter = null;
041  private RateLimiter writeCapacityUnitLimiter = null;
042  private RateLimiter readCapacityUnitLimiter = null;
043  private RateLimiter atomicReqLimiter = null;
044  private RateLimiter atomicReadSizeLimiter = null;
045  private RateLimiter atomicWriteSizeLimiter = null;
046  private RateLimiter reqHandlerUsageTimeLimiter = null;
047
048  private TimeBasedLimiter(Configuration conf) {
049    if (
050      FixedIntervalRateLimiter.class.getName().equals(
051        conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
052          .getName())
053    ) {
054      long refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
055        RateLimiter.DEFAULT_TIME_UNIT);
056      reqsLimiter = new FixedIntervalRateLimiter(refillInterval);
057      reqSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
058      writeReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
059      writeSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
060      readReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
061      readSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
062      reqCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
063      writeCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
064      readCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
065      atomicReqLimiter = new FixedIntervalRateLimiter(refillInterval);
066      atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
067      atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
068      reqHandlerUsageTimeLimiter = new FixedIntervalRateLimiter(refillInterval);
069    } else {
070      reqsLimiter = new AverageIntervalRateLimiter();
071      reqSizeLimiter = new AverageIntervalRateLimiter();
072      writeReqsLimiter = new AverageIntervalRateLimiter();
073      writeSizeLimiter = new AverageIntervalRateLimiter();
074      readReqsLimiter = new AverageIntervalRateLimiter();
075      readSizeLimiter = new AverageIntervalRateLimiter();
076      reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
077      writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
078      readCapacityUnitLimiter = new AverageIntervalRateLimiter();
079      atomicReqLimiter = new AverageIntervalRateLimiter();
080      atomicReadSizeLimiter = new AverageIntervalRateLimiter();
081      atomicWriteSizeLimiter = new AverageIntervalRateLimiter();
082      reqHandlerUsageTimeLimiter = new AverageIntervalRateLimiter();
083    }
084  }
085
086  static QuotaLimiter fromThrottle(Configuration conf, final Throttle throttle) {
087    TimeBasedLimiter limiter = new TimeBasedLimiter(conf);
088    boolean isBypass = true;
089    if (throttle.hasReqNum()) {
090      setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
091      isBypass = false;
092    }
093
094    if (throttle.hasReqSize()) {
095      setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize());
096      isBypass = false;
097    }
098
099    if (throttle.hasWriteNum()) {
100      setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum());
101      isBypass = false;
102    }
103
104    if (throttle.hasWriteSize()) {
105      setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize());
106      isBypass = false;
107    }
108
109    if (throttle.hasReadNum()) {
110      setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum());
111      isBypass = false;
112    }
113
114    if (throttle.hasReadSize()) {
115      setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
116      isBypass = false;
117    }
118
119    if (throttle.hasReqCapacityUnit()) {
120      setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
121      isBypass = false;
122    }
123
124    if (throttle.hasWriteCapacityUnit()) {
125      setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
126      isBypass = false;
127    }
128
129    if (throttle.hasReadCapacityUnit()) {
130      setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
131      isBypass = false;
132    }
133
134    if (throttle.hasAtomicReqNum()) {
135      setFromTimedQuota(limiter.atomicReqLimiter, throttle.getAtomicReqNum());
136      isBypass = false;
137    }
138
139    if (throttle.hasAtomicReadSize()) {
140      setFromTimedQuota(limiter.atomicReadSizeLimiter, throttle.getAtomicReadSize());
141      isBypass = false;
142    }
143
144    if (throttle.hasAtomicWriteSize()) {
145      setFromTimedQuota(limiter.atomicWriteSizeLimiter, throttle.getAtomicWriteSize());
146      isBypass = false;
147    }
148
149    if (throttle.hasReqHandlerUsageMs()) {
150      setFromTimedQuota(limiter.reqHandlerUsageTimeLimiter, throttle.getReqHandlerUsageMs());
151      isBypass = false;
152    }
153
154    return isBypass ? NoopQuotaLimiter.get() : limiter;
155  }
156
157  public void update(final TimeBasedLimiter other) {
158    reqsLimiter.update(other.reqsLimiter);
159    reqSizeLimiter.update(other.reqSizeLimiter);
160    writeReqsLimiter.update(other.writeReqsLimiter);
161    writeSizeLimiter.update(other.writeSizeLimiter);
162    readReqsLimiter.update(other.readReqsLimiter);
163    readSizeLimiter.update(other.readSizeLimiter);
164    reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
165    writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
166    readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
167    atomicReqLimiter.update(other.atomicReqLimiter);
168    atomicReadSizeLimiter.update(other.atomicReadSizeLimiter);
169    atomicWriteSizeLimiter.update(other.atomicWriteSizeLimiter);
170    reqHandlerUsageTimeLimiter.update(other.reqHandlerUsageTimeLimiter);
171  }
172
173  private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
174    limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()));
175  }
176
177  @Override
178  public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
179    long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit,
180    boolean isAtomic, long estimatedReqHandlerUsageTimeMs) throws RpcThrottlingException {
181    long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
182    if (waitInterval > 0) {
183      RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
184    }
185    waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
186    if (waitInterval > 0) {
187      RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
188    }
189    waitInterval = reqCapacityUnitLimiter
190      .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
191    if (waitInterval > 0) {
192      RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
193    }
194    if (isAtomic) {
195      waitInterval = atomicReqLimiter.getWaitIntervalMs(writeReqs + readReqs);
196      if (waitInterval > 0) {
197        RpcThrottlingException.throwAtomicRequestNumberExceeded(waitInterval);
198      }
199    }
200
201    if (estimateWriteSize > 0) {
202      waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
203      if (waitInterval > 0) {
204        RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
205      }
206      waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
207      if (waitInterval > 0) {
208        RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
209      }
210      waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
211      if (waitInterval > 0) {
212        RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
213      }
214      if (isAtomic) {
215        waitInterval = atomicWriteSizeLimiter.getWaitIntervalMs(writeReqs);
216        if (waitInterval > 0) {
217          RpcThrottlingException.throwAtomicWriteSizeExceeded(waitInterval);
218        }
219      }
220    }
221
222    if (estimateReadSize > 0) {
223      waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
224      if (waitInterval > 0) {
225        RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
226      }
227      waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
228      if (waitInterval > 0) {
229        RpcThrottlingException.throwReadSizeExceeded(waitInterval);
230      }
231      waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
232      if (waitInterval > 0) {
233        RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
234      }
235      if (isAtomic) {
236        waitInterval = atomicReadSizeLimiter.getWaitIntervalMs(writeReqs + readReqs);
237        if (waitInterval > 0) {
238          RpcThrottlingException.throwAtomicReadSizeExceeded(waitInterval);
239        }
240      }
241    }
242    waitInterval = reqHandlerUsageTimeLimiter.getWaitIntervalMs(estimatedReqHandlerUsageTimeMs);
243    if (waitInterval > 0) {
244      RpcThrottlingException.throwRequestHandlerUsageTimeExceeded(waitInterval);
245    }
246  }
247
248  @Override
249  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
250    long writeCapacityUnit, long readCapacityUnit, boolean isAtomic,
251    long estimateHandlerThreadUsageMs) {
252    assert writeSize != 0 || readSize != 0;
253
254    reqsLimiter.consume(writeReqs + readReqs);
255    reqSizeLimiter.consume(writeSize + readSize);
256
257    if (writeSize > 0) {
258      writeReqsLimiter.consume(writeReqs);
259      writeSizeLimiter.consume(writeSize);
260    }
261    if (readSize > 0) {
262      readReqsLimiter.consume(readReqs);
263      readSizeLimiter.consume(readSize);
264    }
265    if (writeCapacityUnit > 0) {
266      reqCapacityUnitLimiter.consume(writeCapacityUnit);
267      writeCapacityUnitLimiter.consume(writeCapacityUnit);
268    }
269    if (readCapacityUnit > 0) {
270      reqCapacityUnitLimiter.consume(readCapacityUnit);
271      readCapacityUnitLimiter.consume(readCapacityUnit);
272    }
273    if (isAtomic) {
274      atomicReqLimiter.consume(writeReqs + readReqs);
275      if (readSize > 0) {
276        atomicReadSizeLimiter.consume(readSize);
277      }
278      if (writeSize > 0) {
279        atomicWriteSizeLimiter.consume(writeSize);
280      }
281    }
282    reqHandlerUsageTimeLimiter.consume(estimateHandlerThreadUsageMs);
283  }
284
285  @Override
286  public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) {
287    reqSizeLimiter.consume(size);
288    writeSizeLimiter.consume(size);
289    reqCapacityUnitLimiter.consume(capacityUnit);
290    writeCapacityUnitLimiter.consume(capacityUnit);
291    if (isAtomic) {
292      atomicWriteSizeLimiter.consume(size);
293    }
294  }
295
296  @Override
297  public void consumeRead(final long size, long capacityUnit, boolean isAtomic) {
298    reqSizeLimiter.consume(size);
299    readSizeLimiter.consume(size);
300    reqCapacityUnitLimiter.consume(capacityUnit);
301    readCapacityUnitLimiter.consume(capacityUnit);
302    if (isAtomic) {
303      atomicReadSizeLimiter.consume(size);
304    }
305  }
306
307  @Override
308  public void consumeTime(final long handlerMillisUsed) {
309    reqHandlerUsageTimeLimiter.consume(handlerMillisUsed);
310  }
311
312  @Override
313  public boolean isBypass() {
314    return false;
315  }
316
317  @Override
318  public long getWriteAvailable() {
319    return writeSizeLimiter.getAvailable();
320  }
321
322  @Override
323  public long getRequestNumLimit() {
324    long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();
325
326    if (readAndWriteLimit < 0) { // handle overflow
327      readAndWriteLimit = Long.MAX_VALUE;
328    }
329
330    return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
331  }
332
333  @Override
334  public long getReadNumLimit() {
335    return readReqsLimiter.getLimit();
336  }
337
338  @Override
339  public long getWriteNumLimit() {
340    return writeReqsLimiter.getLimit();
341  }
342
343  @Override
344  public long getReadAvailable() {
345    return readSizeLimiter.getAvailable();
346  }
347
348  @Override
349  public long getReadLimit() {
350    return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
351  }
352
353  @Override
354  public long getWriteLimit() {
355    return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
356  }
357
358  @Override
359  public String toString() {
360    StringBuilder builder = new StringBuilder();
361    builder.append("TimeBasedLimiter(");
362    if (!reqsLimiter.isBypass()) {
363      builder.append("reqs=" + reqsLimiter);
364    }
365    if (!reqSizeLimiter.isBypass()) {
366      builder.append(" resSize=" + reqSizeLimiter);
367    }
368    if (!writeReqsLimiter.isBypass()) {
369      builder.append(" writeReqs=" + writeReqsLimiter);
370    }
371    if (!writeSizeLimiter.isBypass()) {
372      builder.append(" writeSize=" + writeSizeLimiter);
373    }
374    if (!readReqsLimiter.isBypass()) {
375      builder.append(" readReqs=" + readReqsLimiter);
376    }
377    if (!readSizeLimiter.isBypass()) {
378      builder.append(" readSize=" + readSizeLimiter);
379    }
380    if (!reqCapacityUnitLimiter.isBypass()) {
381      builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
382    }
383    if (!writeCapacityUnitLimiter.isBypass()) {
384      builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
385    }
386    if (!readCapacityUnitLimiter.isBypass()) {
387      builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
388    }
389    if (!atomicReqLimiter.isBypass()) {
390      builder.append(" atomicReqLimiter=" + atomicReqLimiter);
391    }
392    if (!atomicReadSizeLimiter.isBypass()) {
393      builder.append(" atomicReadSizeLimiter=" + atomicReadSizeLimiter);
394    }
395    if (!atomicWriteSizeLimiter.isBypass()) {
396      builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter);
397    }
398    if (!reqHandlerUsageTimeLimiter.isBypass()) {
399      builder.append(" reqHandlerUsageTimeLimiter=" + reqHandlerUsageTimeLimiter);
400    }
401    builder.append(')');
402    return builder.toString();
403  }
404}