001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.util.Arrays;
021import java.util.List;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.client.Mutation;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.ipc.RpcCall;
027import org.apache.hadoop.hbase.ipc.RpcServer;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030
031import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
032
033@InterfaceAudience.Private
034@InterfaceStability.Evolving
035public class DefaultOperationQuota implements OperationQuota {
036
037  // a single scan estimate can consume no more than this proportion of the limiter's limit
038  // this prevents a long-running scan from being estimated at, say, 100MB of IO against
039  // a <100MB/IO throttle (because this would never succeed)
040  private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9;
041
042  protected final List<QuotaLimiter> limiters;
043  private final long writeCapacityUnit;
044  private final long readCapacityUnit;
045
046  // the available read/write quota size in bytes
047  protected long readAvailable = 0;
048  // estimated quota
049  protected long writeConsumed = 0;
050  protected long readConsumed = 0;
051  protected long writeCapacityUnitConsumed = 0;
052  protected long readCapacityUnitConsumed = 0;
053  // real consumed quota
054  private final long[] operationSize;
055  // difference between estimated quota and real consumed quota used in close method
056  // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass
057  // of DefaultOperationQuota
058  protected long writeDiff = 0;
059  protected long readDiff = 0;
060  protected long writeCapacityUnitDiff = 0;
061  protected long readCapacityUnitDiff = 0;
062  private boolean useResultSizeBytes;
063  private long blockSizeBytes;
064  private long maxScanEstimate;
065
066  public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
067    final QuotaLimiter... limiters) {
068    this(conf, Arrays.asList(limiters));
069    this.useResultSizeBytes =
070      conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
071    this.blockSizeBytes = blockSizeBytes;
072    long readSizeLimit =
073      Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
074    maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
075  }
076
077  /**
078   * NOTE: The order matters. It should be something like [user, table, namespace, global]
079   */
080  public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
081    this.writeCapacityUnit =
082      conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
083    this.readCapacityUnit =
084      conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
085    this.limiters = limiters;
086    int size = OperationType.values().length;
087    operationSize = new long[size];
088
089    for (int i = 0; i < size; ++i) {
090      operationSize[i] = 0;
091    }
092  }
093
094  @Override
095  public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
096    updateEstimateConsumeBatchQuota(numWrites, numReads);
097    checkQuota(numWrites, numReads);
098  }
099
100  @Override
101  public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
102    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
103    updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
104      prevBlockBytesScannedDifference);
105    checkQuota(0, 1);
106  }
107
108  private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException {
109    readAvailable = Long.MAX_VALUE;
110    for (final QuotaLimiter limiter : limiters) {
111      if (limiter.isBypass()) {
112        continue;
113      }
114
115      long maxRequestsToEstimate = limiter.getRequestNumLimit();
116      long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
117      long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());
118      long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit());
119      long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit());
120
121      limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
122        Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
123        Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
124        readCapacityUnitConsumed);
125      readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
126    }
127
128    for (final QuotaLimiter limiter : limiters) {
129      limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
130        readCapacityUnitConsumed);
131    }
132  }
133
134  @Override
135  public void close() {
136    // Adjust the quota consumed for the specified operation
137    writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
138
139    long resultSize =
140      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
141    if (useResultSizeBytes) {
142      readDiff = resultSize - readConsumed;
143    } else {
144      long blockBytesScanned =
145        RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
146      readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
147    }
148
149    writeCapacityUnitDiff =
150      calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
151    readCapacityUnitDiff = calculateReadCapacityUnitDiff(
152      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
153      readConsumed);
154
155    for (final QuotaLimiter limiter : limiters) {
156      if (writeDiff != 0) {
157        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
158      }
159      if (readDiff != 0) {
160        limiter.consumeRead(readDiff, readCapacityUnitDiff);
161      }
162    }
163  }
164
165  @Override
166  public long getReadAvailable() {
167    return readAvailable;
168  }
169
170  @Override
171  public long getReadConsumed() {
172    return readConsumed;
173  }
174
175  @Override
176  public void addGetResult(final Result result) {
177    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
178  }
179
180  @Override
181  public void addScanResult(final List<Result> results) {
182    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
183  }
184
185  @Override
186  public void addScanResultCells(final List<Cell> cells) {
187    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
188  }
189
190  @Override
191  public void addMutation(final Mutation mutation) {
192    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
193  }
194
195  /**
196   * Update estimate quota(read/write size/capacityUnits) which will be consumed
197   * @param numWrites the number of write requests
198   * @param numReads  the number of read requests
199   */
200  protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
201    writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
202
203    if (useResultSizeBytes) {
204      readConsumed = estimateConsume(OperationType.GET, numReads, 100);
205    } else {
206      // assume 1 block required for reads. this is probably a low estimate, which is okay
207      readConsumed = numReads > 0 ? blockSizeBytes : 0;
208    }
209
210    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
211    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
212  }
213
214  /**
215   * Update estimate quota(read/write size/capacityUnits) which will be consumed
216   * @param scanRequest                     the scan to be executed
217   * @param maxScannerResultSize            the maximum bytes to be returned by the scanner
218   * @param maxBlockBytesScanned            the maximum bytes scanned in a single RPC call by the
219   *                                        scanner
220   * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
221   *                                        calls
222   */
223  protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest,
224    long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
225    if (useResultSizeBytes) {
226      readConsumed = estimateConsume(OperationType.SCAN, 1, 1000);
227    } else {
228      long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(),
229        maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference);
230      readConsumed = Math.min(maxScanEstimate, estimate);
231    }
232
233    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
234  }
235
236  protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq,
237    long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
238    /*
239     * Estimating scan workload is more complicated, and if we severely underestimate workloads then
240     * throttled clients will exhaust retries too quickly, and could saturate the RPC layer
241     */
242    if (nextCallSeq == 0) {
243      // start scanners with an optimistic 1 block IO estimate
244      // it is better to underestimate a large scan in the beginning
245      // than to overestimate, and block, a small scan
246      return blockSizeBytes;
247    }
248
249    boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes;
250    if (isWorkloadGrowing) {
251      // if nextCallSeq > 0 and the workload is growing then our estimate
252      // should consider that the workload may continue to increase
253      return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned);
254    } else {
255      // if nextCallSeq > 0 and the workload is shrinking or flat
256      // then our workload has likely plateaued. We can just rely on the existing
257      // maxBlockBytesScanned as our estimate in this case.
258      return maxBlockBytesScanned;
259    }
260  }
261
262  private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
263    if (numReqs > 0) {
264      return avgSize * numReqs;
265    }
266    return 0;
267  }
268
269  private long calculateWriteCapacityUnit(final long size) {
270    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
271  }
272
273  private long calculateReadCapacityUnit(final long size) {
274    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
275  }
276
277  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
278    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
279  }
280
281  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
282    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
283  }
284}