001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.util.Arrays;
021import java.util.List;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.client.Mutation;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.ipc.RpcCall;
028import org.apache.hadoop.hbase.ipc.RpcServer;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.yetus.audience.InterfaceStability;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
034
035@InterfaceAudience.Private
036@InterfaceStability.Evolving
037public class DefaultOperationQuota implements OperationQuota {
038
039  // a single scan estimate can consume no more than this proportion of the limiter's limit
040  // this prevents a long-running scan from being estimated at, say, 100MB of IO against
041  // a <100MB/IO throttle (because this would never succeed)
042  private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9;
043
044  protected final List<QuotaLimiter> limiters;
045  private final long writeCapacityUnit;
046  private final long readCapacityUnit;
047
048  // the available read/write quota size in bytes
049  protected long readAvailable = 0;
050
051  // The estimated handler usage time in ms for a request based on
052  // the number of requests per second and the number of handler threads
053  private final long estimatedHandlerUsagePerReq;
054
055  // estimated quota
056  protected long writeConsumed = 0;
057  protected long readConsumed = 0;
058  protected long writeCapacityUnitConsumed = 0;
059  protected long readCapacityUnitConsumed = 0;
060  protected long handlerUsageTimeConsumed = 0;
061
062  // real consumed quota
063  private final long[] operationSize;
064  // difference between estimated quota and real consumed quota used in close method
065  // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass
066  // of DefaultOperationQuota
067  protected long writeDiff = 0;
068  protected long readDiff = 0;
069  protected long writeCapacityUnitDiff = 0;
070  protected long readCapacityUnitDiff = 0;
071  protected long handlerUsageTimeDiff = 0;
072  private boolean useResultSizeBytes;
073  private long blockSizeBytes;
074  private long maxScanEstimate;
075  private boolean isAtomic = false;
076
077  public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
078    final double requestsPerSecond, final QuotaLimiter... limiters) {
079    this(conf, requestsPerSecond, Arrays.asList(limiters));
080    this.useResultSizeBytes =
081      conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
082    this.blockSizeBytes = blockSizeBytes;
083    long readSizeLimit =
084      Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
085    maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
086  }
087
088  /**
089   * NOTE: The order matters. It should be something like [user, table, namespace, global]
090   */
091  public DefaultOperationQuota(final Configuration conf, final double requestsPerSecond,
092    final List<QuotaLimiter> limiters) {
093    this.writeCapacityUnit =
094      conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
095    this.readCapacityUnit =
096      conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
097    this.limiters = limiters;
098    int numHandlerThreads = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
099      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
100    this.estimatedHandlerUsagePerReq =
101      calculateHandlerUsageTimeEstimate(requestsPerSecond, numHandlerThreads);
102
103    int size = OperationType.values().length;
104    operationSize = new long[size];
105    for (int i = 0; i < size; ++i) {
106      operationSize[i] = 0;
107    }
108  }
109
110  @Override
111  public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic)
112    throws RpcThrottlingException {
113    updateEstimateConsumeBatchQuota(numWrites, numReads);
114    checkQuota(numWrites, numReads, isAtomic);
115  }
116
117  @Override
118  public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
119    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
120    updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
121      prevBlockBytesScannedDifference);
122    checkQuota(0, 1, false);
123  }
124
125  private void checkQuota(long numWrites, long numReads, boolean isAtomic)
126    throws RpcThrottlingException {
127    if (isAtomic) {
128      // Remember this flag for later use in close()
129      this.isAtomic = true;
130    }
131    readAvailable = Long.MAX_VALUE;
132    for (final QuotaLimiter limiter : limiters) {
133      if (limiter.isBypass()) {
134        continue;
135      }
136
137      long maxRequestsToEstimate = limiter.getRequestNumLimit();
138      long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
139      long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());
140      long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit());
141      long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit());
142
143      limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
144        Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
145        Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
146        readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
147      readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
148    }
149
150    for (final QuotaLimiter limiter : limiters) {
151      limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
152        readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
153    }
154  }
155
156  @Override
157  public void close() {
158    // Adjust the quota consumed for the specified operation
159    writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
160
161    long resultSize =
162      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
163    if (useResultSizeBytes) {
164      readDiff = resultSize - readConsumed;
165    } else {
166      long blockBytesScanned =
167        RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
168      readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
169    }
170    writeCapacityUnitDiff =
171      calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
172    readCapacityUnitDiff = calculateReadCapacityUnitDiff(
173      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
174      readConsumed);
175    handlerUsageTimeDiff = calculateHandlerUsageMsDiff();
176
177    for (final QuotaLimiter limiter : limiters) {
178      if (writeDiff != 0) {
179        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic);
180      }
181      if (readDiff != 0) {
182        limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic);
183      }
184      if (handlerUsageTimeDiff != 0) {
185        limiter.consumeTime(handlerUsageTimeDiff);
186      }
187    }
188  }
189
190  @Override
191  public long getReadAvailable() {
192    return readAvailable;
193  }
194
195  @Override
196  public long getReadConsumed() {
197    return readConsumed;
198  }
199
200  @Override
201  public void addGetResult(final Result result) {
202    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
203  }
204
205  @Override
206  public void addScanResult(final List<Result> results) {
207    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
208  }
209
210  @Override
211  public void addScanResultCells(final List<Cell> cells) {
212    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
213  }
214
215  @Override
216  public void addMutation(final Mutation mutation) {
217    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
218  }
219
220  /**
221   * Update estimate quota(read/write size/capacityUnits) which will be consumed
222   * @param numWrites the number of write requests
223   * @param numReads  the number of read requests
224   */
225  protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
226    writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
227
228    if (useResultSizeBytes) {
229      readConsumed = estimateConsume(OperationType.GET, numReads, 100);
230    } else {
231      // assume 1 block required for reads. this is probably a low estimate, which is okay
232      readConsumed = numReads > 0 ? blockSizeBytes : 0;
233    }
234
235    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
236    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
237
238    handlerUsageTimeConsumed = (numReads + numWrites) * estimatedHandlerUsagePerReq;
239  }
240
241  /**
242   * Update estimate quota(read/write size/capacityUnits) which will be consumed
243   * @param scanRequest                     the scan to be executed
244   * @param maxScannerResultSize            the maximum bytes to be returned by the scanner
245   * @param maxBlockBytesScanned            the maximum bytes scanned in a single RPC call by the
246   *                                        scanner
247   * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
248   *                                        calls
249   */
250  protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest,
251    long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
252    if (useResultSizeBytes) {
253      readConsumed = estimateConsume(OperationType.SCAN, 1, 1000);
254    } else {
255      long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(),
256        maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference);
257      readConsumed = Math.min(maxScanEstimate, estimate);
258    }
259
260    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
261    handlerUsageTimeConsumed = estimatedHandlerUsagePerReq;
262  }
263
264  protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq,
265    long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
266    /*
267     * Estimating scan workload is more complicated, and if we severely underestimate workloads then
268     * throttled clients will exhaust retries too quickly, and could saturate the RPC layer
269     */
270    if (nextCallSeq == 0) {
271      // start scanners with an optimistic 1 block IO estimate
272      // it is better to underestimate a large scan in the beginning
273      // than to overestimate, and block, a small scan
274      return blockSizeBytes;
275    }
276
277    boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes;
278    if (isWorkloadGrowing) {
279      // if nextCallSeq > 0 and the workload is growing then our estimate
280      // should consider that the workload may continue to increase
281      return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned);
282    } else {
283      // if nextCallSeq > 0 and the workload is shrinking or flat
284      // then our workload has likely plateaued. We can just rely on the existing
285      // maxBlockBytesScanned as our estimate in this case.
286      return maxBlockBytesScanned;
287    }
288  }
289
290  private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
291    if (numReqs > 0) {
292      return avgSize * numReqs;
293    }
294    return 0;
295  }
296
297  private long calculateWriteCapacityUnit(final long size) {
298    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
299  }
300
301  private long calculateReadCapacityUnit(final long size) {
302    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
303  }
304
305  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
306    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
307  }
308
309  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
310    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
311  }
312
313  private long calculateHandlerUsageTimeEstimate(final double requestsPerSecond,
314    final int numHandlerThreads) {
315    if (requestsPerSecond <= numHandlerThreads) {
316      // If less than 1 request per second per handler thread, then we use the number of handler
317      // threads as a baseline to avoid incorrect estimations when the number of requests is very
318      // low.
319      return numHandlerThreads;
320    } else {
321      double requestsPerMillisecond = Math.ceil(requestsPerSecond / 1000);
322      // We don't ever want zero here
323      return Math.max((long) requestsPerMillisecond, 1L);
324    }
325  }
326
327  private long calculateHandlerUsageMsDiff() {
328    long currentTime = EnvironmentEdgeManager.currentTime();
329    long startTime = RpcServer.getCurrentCall().map(RpcCall::getStartTime).orElse(currentTime);
330    long timeElapsed = currentTime - startTime;
331    return handlerUsageTimeConsumed - timeElapsed;
332  }
333}