001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.yetus.audience.InterfaceAudience;
022import org.apache.yetus.audience.InterfaceStability;
023
024import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
025import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
026import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
027
028/**
029 * Simple time based limiter that checks the quota Throttle
030 */
031@InterfaceAudience.Private
032@InterfaceStability.Evolving
033public class TimeBasedLimiter implements QuotaLimiter {
034  private RateLimiter reqsLimiter = null;
035  private RateLimiter reqSizeLimiter = null;
036  private RateLimiter writeReqsLimiter = null;
037  private RateLimiter writeSizeLimiter = null;
038  private RateLimiter readReqsLimiter = null;
039  private RateLimiter readSizeLimiter = null;
040  private RateLimiter reqCapacityUnitLimiter = null;
041  private RateLimiter writeCapacityUnitLimiter = null;
042  private RateLimiter readCapacityUnitLimiter = null;
043  private RateLimiter atomicReqLimiter = null;
044  private RateLimiter atomicReadSizeLimiter = null;
045  private RateLimiter atomicWriteSizeLimiter = null;
046  private RateLimiter reqHandlerUsageTimeLimiter = null;
047
048  private TimeBasedLimiter(Configuration conf) {
049    String limiterClassName =
050      conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
051        .getName();
052    if (FixedIntervalRateLimiter.class.getName().equals(limiterClassName)) {
053      long refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
054        RateLimiter.DEFAULT_TIME_UNIT);
055      reqsLimiter = new FixedIntervalRateLimiter(refillInterval);
056      reqSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
057      writeReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
058      writeSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
059      readReqsLimiter = new FixedIntervalRateLimiter(refillInterval);
060      readSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
061      reqCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
062      writeCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
063      readCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval);
064      atomicReqLimiter = new FixedIntervalRateLimiter(refillInterval);
065      atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
066      atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
067      reqHandlerUsageTimeLimiter = new FixedIntervalRateLimiter(refillInterval);
068    } else if (FeedbackAdaptiveRateLimiter.class.getName().equals(limiterClassName)) {
069      FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory feedbackLimiterFactory =
070        new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
071      reqsLimiter = feedbackLimiterFactory.create();
072      reqSizeLimiter = feedbackLimiterFactory.create();
073      writeReqsLimiter = feedbackLimiterFactory.create();
074      writeSizeLimiter = feedbackLimiterFactory.create();
075      readReqsLimiter = feedbackLimiterFactory.create();
076      readSizeLimiter = feedbackLimiterFactory.create();
077      reqCapacityUnitLimiter = feedbackLimiterFactory.create();
078      writeCapacityUnitLimiter = feedbackLimiterFactory.create();
079      readCapacityUnitLimiter = feedbackLimiterFactory.create();
080      atomicReqLimiter = feedbackLimiterFactory.create();
081      atomicReadSizeLimiter = feedbackLimiterFactory.create();
082      atomicWriteSizeLimiter = feedbackLimiterFactory.create();
083      reqHandlerUsageTimeLimiter = feedbackLimiterFactory.create();
084    } else {
085      reqsLimiter = new AverageIntervalRateLimiter();
086      reqSizeLimiter = new AverageIntervalRateLimiter();
087      writeReqsLimiter = new AverageIntervalRateLimiter();
088      writeSizeLimiter = new AverageIntervalRateLimiter();
089      readReqsLimiter = new AverageIntervalRateLimiter();
090      readSizeLimiter = new AverageIntervalRateLimiter();
091      reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
092      writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
093      readCapacityUnitLimiter = new AverageIntervalRateLimiter();
094      atomicReqLimiter = new AverageIntervalRateLimiter();
095      atomicReadSizeLimiter = new AverageIntervalRateLimiter();
096      atomicWriteSizeLimiter = new AverageIntervalRateLimiter();
097      reqHandlerUsageTimeLimiter = new AverageIntervalRateLimiter();
098    }
099  }
100
101  static QuotaLimiter fromThrottle(Configuration conf, final Throttle throttle) {
102    TimeBasedLimiter limiter = new TimeBasedLimiter(conf);
103    boolean isBypass = true;
104    if (throttle.hasReqNum()) {
105      setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
106      isBypass = false;
107    }
108
109    if (throttle.hasReqSize()) {
110      setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize());
111      isBypass = false;
112    }
113
114    if (throttle.hasWriteNum()) {
115      setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum());
116      isBypass = false;
117    }
118
119    if (throttle.hasWriteSize()) {
120      setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize());
121      isBypass = false;
122    }
123
124    if (throttle.hasReadNum()) {
125      setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum());
126      isBypass = false;
127    }
128
129    if (throttle.hasReadSize()) {
130      setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
131      isBypass = false;
132    }
133
134    if (throttle.hasReqCapacityUnit()) {
135      setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
136      isBypass = false;
137    }
138
139    if (throttle.hasWriteCapacityUnit()) {
140      setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
141      isBypass = false;
142    }
143
144    if (throttle.hasReadCapacityUnit()) {
145      setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
146      isBypass = false;
147    }
148
149    if (throttle.hasAtomicReqNum()) {
150      setFromTimedQuota(limiter.atomicReqLimiter, throttle.getAtomicReqNum());
151      isBypass = false;
152    }
153
154    if (throttle.hasAtomicReadSize()) {
155      setFromTimedQuota(limiter.atomicReadSizeLimiter, throttle.getAtomicReadSize());
156      isBypass = false;
157    }
158
159    if (throttle.hasAtomicWriteSize()) {
160      setFromTimedQuota(limiter.atomicWriteSizeLimiter, throttle.getAtomicWriteSize());
161      isBypass = false;
162    }
163
164    if (throttle.hasReqHandlerUsageMs()) {
165      setFromTimedQuota(limiter.reqHandlerUsageTimeLimiter, throttle.getReqHandlerUsageMs());
166      isBypass = false;
167    }
168
169    return isBypass ? NoopQuotaLimiter.get() : limiter;
170  }
171
172  public void update(final TimeBasedLimiter other) {
173    reqsLimiter.update(other.reqsLimiter);
174    reqSizeLimiter.update(other.reqSizeLimiter);
175    writeReqsLimiter.update(other.writeReqsLimiter);
176    writeSizeLimiter.update(other.writeSizeLimiter);
177    readReqsLimiter.update(other.readReqsLimiter);
178    readSizeLimiter.update(other.readSizeLimiter);
179    reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
180    writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
181    readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
182    atomicReqLimiter.update(other.atomicReqLimiter);
183    atomicReadSizeLimiter.update(other.atomicReadSizeLimiter);
184    atomicWriteSizeLimiter.update(other.atomicWriteSizeLimiter);
185    reqHandlerUsageTimeLimiter.update(other.reqHandlerUsageTimeLimiter);
186  }
187
188  private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
189    limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()));
190  }
191
192  @Override
193  public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
194    long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit,
195    boolean isAtomic, long estimatedReqHandlerUsageTimeMs) throws RpcThrottlingException {
196    long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
197    if (waitInterval > 0) {
198      RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
199    }
200    waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
201    if (waitInterval > 0) {
202      RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
203    }
204    waitInterval = reqCapacityUnitLimiter
205      .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
206    if (waitInterval > 0) {
207      RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
208    }
209    if (isAtomic) {
210      waitInterval = atomicReqLimiter.getWaitIntervalMs(writeReqs + readReqs);
211      if (waitInterval > 0) {
212        RpcThrottlingException.throwAtomicRequestNumberExceeded(waitInterval);
213      }
214    }
215
216    if (estimateWriteSize > 0) {
217      waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
218      if (waitInterval > 0) {
219        RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
220      }
221      waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
222      if (waitInterval > 0) {
223        RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
224      }
225      waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
226      if (waitInterval > 0) {
227        RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
228      }
229      if (isAtomic) {
230        waitInterval = atomicWriteSizeLimiter.getWaitIntervalMs(writeReqs);
231        if (waitInterval > 0) {
232          RpcThrottlingException.throwAtomicWriteSizeExceeded(waitInterval);
233        }
234      }
235    }
236
237    if (estimateReadSize > 0) {
238      waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
239      if (waitInterval > 0) {
240        RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
241      }
242      waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
243      if (waitInterval > 0) {
244        RpcThrottlingException.throwReadSizeExceeded(waitInterval);
245      }
246      waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
247      if (waitInterval > 0) {
248        RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
249      }
250      if (isAtomic) {
251        waitInterval = atomicReadSizeLimiter.getWaitIntervalMs(writeReqs + readReqs);
252        if (waitInterval > 0) {
253          RpcThrottlingException.throwAtomicReadSizeExceeded(waitInterval);
254        }
255      }
256    }
257    waitInterval = reqHandlerUsageTimeLimiter.getWaitIntervalMs(estimatedReqHandlerUsageTimeMs);
258    if (waitInterval > 0) {
259      RpcThrottlingException.throwRequestHandlerUsageTimeExceeded(waitInterval);
260    }
261  }
262
263  @Override
264  public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
265    long writeCapacityUnit, long readCapacityUnit, boolean isAtomic,
266    long estimateHandlerThreadUsageMs) {
267    assert writeSize != 0 || readSize != 0;
268
269    reqsLimiter.consume(writeReqs + readReqs);
270    reqSizeLimiter.consume(writeSize + readSize);
271
272    if (writeSize > 0) {
273      writeReqsLimiter.consume(writeReqs);
274      writeSizeLimiter.consume(writeSize);
275    }
276    if (readSize > 0) {
277      readReqsLimiter.consume(readReqs);
278      readSizeLimiter.consume(readSize);
279    }
280    if (writeCapacityUnit > 0) {
281      reqCapacityUnitLimiter.consume(writeCapacityUnit);
282      writeCapacityUnitLimiter.consume(writeCapacityUnit);
283    }
284    if (readCapacityUnit > 0) {
285      reqCapacityUnitLimiter.consume(readCapacityUnit);
286      readCapacityUnitLimiter.consume(readCapacityUnit);
287    }
288    if (isAtomic) {
289      atomicReqLimiter.consume(writeReqs + readReqs);
290      if (readSize > 0) {
291        atomicReadSizeLimiter.consume(readSize);
292      }
293      if (writeSize > 0) {
294        atomicWriteSizeLimiter.consume(writeSize);
295      }
296    }
297    reqHandlerUsageTimeLimiter.consume(estimateHandlerThreadUsageMs);
298  }
299
300  @Override
301  public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) {
302    reqSizeLimiter.consume(size);
303    writeSizeLimiter.consume(size);
304    reqCapacityUnitLimiter.consume(capacityUnit);
305    writeCapacityUnitLimiter.consume(capacityUnit);
306    if (isAtomic) {
307      atomicWriteSizeLimiter.consume(size);
308    }
309  }
310
311  @Override
312  public void consumeRead(final long size, long capacityUnit, boolean isAtomic) {
313    reqSizeLimiter.consume(size);
314    readSizeLimiter.consume(size);
315    reqCapacityUnitLimiter.consume(capacityUnit);
316    readCapacityUnitLimiter.consume(capacityUnit);
317    if (isAtomic) {
318      atomicReadSizeLimiter.consume(size);
319    }
320  }
321
322  @Override
323  public void consumeTime(final long handlerMillisUsed) {
324    reqHandlerUsageTimeLimiter.consume(handlerMillisUsed);
325  }
326
327  @Override
328  public boolean isBypass() {
329    return false;
330  }
331
332  @Override
333  public long getWriteAvailable() {
334    return writeSizeLimiter.getAvailable();
335  }
336
337  @Override
338  public long getRequestNumLimit() {
339    long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();
340
341    if (readAndWriteLimit < 0) { // handle overflow
342      readAndWriteLimit = Long.MAX_VALUE;
343    }
344
345    return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
346  }
347
348  @Override
349  public long getReadNumLimit() {
350    return readReqsLimiter.getLimit();
351  }
352
353  @Override
354  public long getWriteNumLimit() {
355    return writeReqsLimiter.getLimit();
356  }
357
358  @Override
359  public long getReadAvailable() {
360    return readSizeLimiter.getAvailable();
361  }
362
363  @Override
364  public long getReadLimit() {
365    return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
366  }
367
368  @Override
369  public long getWriteLimit() {
370    return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
371  }
372
373  @Override
374  public String toString() {
375    StringBuilder builder = new StringBuilder();
376    builder.append("TimeBasedLimiter(");
377    if (!reqsLimiter.isBypass()) {
378      builder.append("reqs=" + reqsLimiter);
379    }
380    if (!reqSizeLimiter.isBypass()) {
381      builder.append(" resSize=" + reqSizeLimiter);
382    }
383    if (!writeReqsLimiter.isBypass()) {
384      builder.append(" writeReqs=" + writeReqsLimiter);
385    }
386    if (!writeSizeLimiter.isBypass()) {
387      builder.append(" writeSize=" + writeSizeLimiter);
388    }
389    if (!readReqsLimiter.isBypass()) {
390      builder.append(" readReqs=" + readReqsLimiter);
391    }
392    if (!readSizeLimiter.isBypass()) {
393      builder.append(" readSize=" + readSizeLimiter);
394    }
395    if (!reqCapacityUnitLimiter.isBypass()) {
396      builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
397    }
398    if (!writeCapacityUnitLimiter.isBypass()) {
399      builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
400    }
401    if (!readCapacityUnitLimiter.isBypass()) {
402      builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
403    }
404    if (!atomicReqLimiter.isBypass()) {
405      builder.append(" atomicReqLimiter=" + atomicReqLimiter);
406    }
407    if (!atomicReadSizeLimiter.isBypass()) {
408      builder.append(" atomicReadSizeLimiter=" + atomicReadSizeLimiter);
409    }
410    if (!atomicWriteSizeLimiter.isBypass()) {
411      builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter);
412    }
413    if (!reqHandlerUsageTimeLimiter.isBypass()) {
414      builder.append(" reqHandlerUsageTimeLimiter=" + reqHandlerUsageTimeLimiter);
415    }
416    builder.append(')');
417    return builder.toString();
418  }
419}