001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.HBaseConfiguration;
022import org.apache.yetus.audience.InterfaceAudience;
023import org.apache.yetus.audience.InterfaceStability;
024
025import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
026import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
027import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
028
029/**
030 * Simple time based limiter that checks the quota Throttle
031 */
032@InterfaceAudience.Private
033@InterfaceStability.Evolving
034public class TimeBasedLimiter implements QuotaLimiter {
035  private static final Configuration conf = HBaseConfiguration.create();
036  private RateLimiter reqsLimiter = null;
037  private RateLimiter reqSizeLimiter = null;
038  private RateLimiter writeReqsLimiter = null;
039  private RateLimiter writeSizeLimiter = null;
040  private RateLimiter readReqsLimiter = null;
041  private RateLimiter readSizeLimiter = null;
042  private RateLimiter reqCapacityUnitLimiter = null;
043  private RateLimiter writeCapacityUnitLimiter = null;
044  private RateLimiter readCapacityUnitLimiter = null;
045  private RateLimiter atomicReqLimiter = null;
046  private RateLimiter atomicReadSizeLimiter = null;
047  private RateLimiter atomicWriteSizeLimiter = null;
048
049  private TimeBasedLimiter() {
050    if (
051      FixedIntervalRateLimiter.class.getName().equals(
052        conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
053          .getName())
054    ) {
055      long refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
056        RateLimiter.DEFAULT_TIME_UNIT);
057      reqsLimiter = new FixedIntervalRateLimiter(refillInterval);
058      reqSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
059      writeReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
060      writeSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
061      readReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
062      readSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
063      reqCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
064      writeCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
065      readCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
066      atomicReqLimiter = new FixedIntervalRateLimiter(refillInterval);
067      atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
068      atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
069    } else {
070      reqsLimiter = new AverageIntervalRateLimiter();
071      reqSizeLimiter = new AverageIntervalRateLimiter();
072      writeReqsLimiter = new AverageIntervalRateLimiter();
073      writeSizeLimiter = new AverageIntervalRateLimiter();
074      readReqsLimiter = new AverageIntervalRateLimiter();
075      readSizeLimiter = new AverageIntervalRateLimiter();
076      reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
077      writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
078      readCapacityUnitLimiter = new AverageIntervalRateLimiter();
079      atomicReqLimiter = new AverageIntervalRateLimiter();
080      atomicReadSizeLimiter = new AverageIntervalRateLimiter();
081      atomicWriteSizeLimiter = new AverageIntervalRateLimiter();
082    }
083  }
084
085  static QuotaLimiter fromThrottle(final Throttle throttle) {
086    TimeBasedLimiter limiter = new TimeBasedLimiter();
087    boolean isBypass = true;
088    if (throttle.hasReqNum()) {
089      setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
090      isBypass = false;
091    }
092
093    if (throttle.hasReqSize()) {
094      setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize());
095      isBypass = false;
096    }
097
098    if (throttle.hasWriteNum()) {
099      setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum());
100      isBypass = false;
101    }
102
103    if (throttle.hasWriteSize()) {
104      setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize());
105      isBypass = false;
106    }
107
108    if (throttle.hasReadNum()) {
109      setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum());
110      isBypass = false;
111    }
112
113    if (throttle.hasReadSize()) {
114      setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
115      isBypass = false;
116    }
117
118    if (throttle.hasReqCapacityUnit()) {
119      setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
120      isBypass = false;
121    }
122
123    if (throttle.hasWriteCapacityUnit()) {
124      setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
125      isBypass = false;
126    }
127
128    if (throttle.hasReadCapacityUnit()) {
129      setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
130      isBypass = false;
131    }
132
133    if (throttle.hasAtomicReqNum()) {
134      setFromTimedQuota(limiter.atomicReqLimiter, throttle.getAtomicReqNum());
135      isBypass = false;
136    }
137
138    if (throttle.hasAtomicReadSize()) {
139      setFromTimedQuota(limiter.atomicReadSizeLimiter, throttle.getAtomicReadSize());
140      isBypass = false;
141    }
142
143    if (throttle.hasAtomicWriteSize()) {
144      setFromTimedQuota(limiter.atomicWriteSizeLimiter, throttle.getAtomicWriteSize());
145      isBypass = false;
146    }
147
148    return isBypass ? NoopQuotaLimiter.get() : limiter;
149  }
150
151  public void update(final TimeBasedLimiter other) {
152    reqsLimiter.update(other.reqsLimiter);
153    reqSizeLimiter.update(other.reqSizeLimiter);
154    writeReqsLimiter.update(other.writeReqsLimiter);
155    writeSizeLimiter.update(other.writeSizeLimiter);
156    readReqsLimiter.update(other.readReqsLimiter);
157    readSizeLimiter.update(other.readSizeLimiter);
158    reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
159    writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
160    readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
161    atomicReqLimiter.update(other.atomicReqLimiter);
162    atomicReadSizeLimiter.update(other.atomicReadSizeLimiter);
163    atomicWriteSizeLimiter.update(other.atomicWriteSizeLimiter);
164  }
165
166  private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
167    limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()));
168  }
169
170  @Override
171  public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
172    long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit,
173    boolean isAtomic) throws RpcThrottlingException {
174    long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
175    if (waitInterval > 0) {
176      RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
177    }
178    waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
179    if (waitInterval > 0) {
180      RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
181    }
182    waitInterval = reqCapacityUnitLimiter
183      .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
184    if (waitInterval > 0) {
185      RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
186    }
187    if (isAtomic) {
188      waitInterval = atomicReqLimiter.getWaitIntervalMs(writeReqs + readReqs);
189      if (waitInterval > 0) {
190        RpcThrottlingException.throwAtomicRequestNumberExceeded(waitInterval);
191      }
192    }
193
194    if (estimateWriteSize > 0) {
195      waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
196      if (waitInterval > 0) {
197        RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
198      }
199      waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
200      if (waitInterval > 0) {
201        RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
202      }
203      waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
204      if (waitInterval > 0) {
205        RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
206      }
207      if (isAtomic) {
208        waitInterval = atomicWriteSizeLimiter.getWaitIntervalMs(writeReqs);
209        if (waitInterval > 0) {
210          RpcThrottlingException.throwAtomicWriteSizeExceeded(waitInterval);
211        }
212      }
213    }
214
215    if (estimateReadSize > 0) {
216      waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
217      if (waitInterval > 0) {
218        RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
219      }
220      waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
221      if (waitInterval > 0) {
222        RpcThrottlingException.throwReadSizeExceeded(waitInterval);
223      }
224      waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
225      if (waitInterval > 0) {
226        RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
227      }
228      if (isAtomic) {
229        waitInterval = atomicReadSizeLimiter.getWaitIntervalMs(writeReqs + readReqs);
230        if (waitInterval > 0) {
231          RpcThrottlingException.throwAtomicReadSizeExceeded(waitInterval);
232        }
233      }
234    }
235  }
236
237  @Override
238  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
239    long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) {
240    assert writeSize != 0 || readSize != 0;
241
242    reqsLimiter.consume(writeReqs + readReqs);
243    reqSizeLimiter.consume(writeSize + readSize);
244
245    if (writeSize > 0) {
246      writeReqsLimiter.consume(writeReqs);
247      writeSizeLimiter.consume(writeSize);
248    }
249    if (readSize > 0) {
250      readReqsLimiter.consume(readReqs);
251      readSizeLimiter.consume(readSize);
252    }
253    if (writeCapacityUnit > 0) {
254      reqCapacityUnitLimiter.consume(writeCapacityUnit);
255      writeCapacityUnitLimiter.consume(writeCapacityUnit);
256    }
257    if (readCapacityUnit > 0) {
258      reqCapacityUnitLimiter.consume(readCapacityUnit);
259      readCapacityUnitLimiter.consume(readCapacityUnit);
260    }
261    if (isAtomic) {
262      atomicReqLimiter.consume(writeReqs + readReqs);
263      if (readSize > 0) {
264        atomicReadSizeLimiter.consume(readSize);
265      }
266      if (writeSize > 0) {
267        atomicWriteSizeLimiter.consume(writeSize);
268      }
269    }
270  }
271
272  @Override
273  public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) {
274    reqSizeLimiter.consume(size);
275    writeSizeLimiter.consume(size);
276    reqCapacityUnitLimiter.consume(capacityUnit);
277    writeCapacityUnitLimiter.consume(capacityUnit);
278    if (isAtomic) {
279      atomicWriteSizeLimiter.consume(size);
280    }
281  }
282
283  @Override
284  public void consumeRead(final long size, long capacityUnit, boolean isAtomic) {
285    reqSizeLimiter.consume(size);
286    readSizeLimiter.consume(size);
287    reqCapacityUnitLimiter.consume(capacityUnit);
288    readCapacityUnitLimiter.consume(capacityUnit);
289    if (isAtomic) {
290      atomicReadSizeLimiter.consume(size);
291    }
292  }
293
294  @Override
295  public boolean isBypass() {
296    return false;
297  }
298
299  @Override
300  public long getWriteAvailable() {
301    return writeSizeLimiter.getAvailable();
302  }
303
304  @Override
305  public long getRequestNumLimit() {
306    long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();
307
308    if (readAndWriteLimit < 0) { // handle overflow
309      readAndWriteLimit = Long.MAX_VALUE;
310    }
311
312    return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
313  }
314
315  @Override
316  public long getReadNumLimit() {
317    return readReqsLimiter.getLimit();
318  }
319
320  @Override
321  public long getWriteNumLimit() {
322    return writeReqsLimiter.getLimit();
323  }
324
325  @Override
326  public long getReadAvailable() {
327    return readSizeLimiter.getAvailable();
328  }
329
330  @Override
331  public long getReadLimit() {
332    return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
333  }
334
335  @Override
336  public long getWriteLimit() {
337    return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
338  }
339
340  @Override
341  public String toString() {
342    StringBuilder builder = new StringBuilder();
343    builder.append("TimeBasedLimiter(");
344    if (!reqsLimiter.isBypass()) {
345      builder.append("reqs=" + reqsLimiter);
346    }
347    if (!reqSizeLimiter.isBypass()) {
348      builder.append(" resSize=" + reqSizeLimiter);
349    }
350    if (!writeReqsLimiter.isBypass()) {
351      builder.append(" writeReqs=" + writeReqsLimiter);
352    }
353    if (!writeSizeLimiter.isBypass()) {
354      builder.append(" writeSize=" + writeSizeLimiter);
355    }
356    if (!readReqsLimiter.isBypass()) {
357      builder.append(" readReqs=" + readReqsLimiter);
358    }
359    if (!readSizeLimiter.isBypass()) {
360      builder.append(" readSize=" + readSizeLimiter);
361    }
362    if (!reqCapacityUnitLimiter.isBypass()) {
363      builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
364    }
365    if (!writeCapacityUnitLimiter.isBypass()) {
366      builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
367    }
368    if (!readCapacityUnitLimiter.isBypass()) {
369      builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
370    }
371    if (!atomicReqLimiter.isBypass()) {
372      builder.append(" atomicReqLimiter=" + atomicReqLimiter);
373    }
374    if (!atomicReadSizeLimiter.isBypass()) {
375      builder.append(" atomicReadSizeLimiter=" + atomicReadSizeLimiter);
376    }
377    if (!atomicWriteSizeLimiter.isBypass()) {
378      builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter);
379    }
380    builder.append(')');
381    return builder.toString();
382  }
383}