001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.Arrays; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.Cell; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.client.Mutation; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.ipc.RpcCall; 028import org.apache.hadoop.hbase.ipc.RpcServer; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.yetus.audience.InterfaceStability; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 034 035@InterfaceAudience.Private 036@InterfaceStability.Evolving 037public class DefaultOperationQuota implements OperationQuota { 038 039 // a single scan estimate can consume no more than this proportion of the limiter's limit 040 // this prevents a long-running scan from being estimated at, say, 100MB of IO against 041 // a <100MB/IO throttle (because this would never succeed) 042 private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9; 043 044 protected final List<QuotaLimiter> limiters; 045 private final long writeCapacityUnit; 046 private final long readCapacityUnit; 047 048 // the available read/write quota size in bytes 049 protected long readAvailable = 0; 050 051 // The estimated handler usage time in ms for a request based on 052 // the number of requests per second and the number of handler threads 053 private final long estimatedHandlerUsagePerReq; 054 055 // estimated quota 056 protected long writeConsumed = 0; 057 protected long readConsumed = 0; 058 protected long writeCapacityUnitConsumed = 0; 059 protected long readCapacityUnitConsumed = 0; 060 protected long handlerUsageTimeConsumed = 0; 061 062 // real consumed quota 063 private final long[] operationSize; 064 // difference between estimated quota and real consumed quota used in close method 065 // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass 066 // of DefaultOperationQuota 067 protected long writeDiff = 0; 068 protected long readDiff = 0; 069 protected long writeCapacityUnitDiff = 0; 070 protected long readCapacityUnitDiff = 0; 071 protected long handlerUsageTimeDiff = 0; 072 private boolean useResultSizeBytes; 073 private long blockSizeBytes; 074 private long maxScanEstimate; 075 private boolean isAtomic = false; 076 077 public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, 078 final double requestsPerSecond, final QuotaLimiter... limiters) { 079 this(conf, requestsPerSecond, Arrays.asList(limiters)); 080 this.useResultSizeBytes = 081 conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); 082 this.blockSizeBytes = blockSizeBytes; 083 long readSizeLimit = 084 Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE); 085 maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); 086 } 087 088 /** 089 * NOTE: The order matters. It should be something like [user, table, namespace, global] 090 */ 091 public DefaultOperationQuota(final Configuration conf, final double requestsPerSecond, 092 final List<QuotaLimiter> limiters) { 093 this.writeCapacityUnit = 094 conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); 095 this.readCapacityUnit = 096 conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); 097 this.limiters = limiters; 098 int numHandlerThreads = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 099 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 100 this.estimatedHandlerUsagePerReq = 101 calculateHandlerUsageTimeEstimate(requestsPerSecond, numHandlerThreads); 102 103 int size = OperationType.values().length; 104 operationSize = new long[size]; 105 for (int i = 0; i < size; ++i) { 106 operationSize[i] = 0; 107 } 108 } 109 110 @Override 111 public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) 112 throws RpcThrottlingException { 113 updateEstimateConsumeBatchQuota(numWrites, numReads); 114 checkQuota(numWrites, numReads, isAtomic); 115 } 116 117 @Override 118 public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 119 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { 120 updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 121 prevBlockBytesScannedDifference); 122 checkQuota(0, 1, false); 123 } 124 125 private void checkQuota(long numWrites, long numReads, boolean isAtomic) 126 throws RpcThrottlingException { 127 if (isAtomic) { 128 // Remember this flag for later use in close() 129 this.isAtomic = true; 130 } 131 readAvailable = Long.MAX_VALUE; 132 for (final QuotaLimiter limiter : limiters) { 133 if (limiter.isBypass()) { 134 continue; 135 } 136 137 long maxRequestsToEstimate = limiter.getRequestNumLimit(); 138 long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); 139 long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); 140 long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit()); 141 long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit()); 142 143 limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), 144 Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), 145 Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, 146 readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed); 147 readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); 148 } 149 150 for (final QuotaLimiter limiter : limiters) { 151 limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, 152 readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed); 153 } 154 } 155 156 @Override 157 public void close() { 158 // Adjust the quota consumed for the specified operation 159 writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; 160 161 long resultSize = 162 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; 163 if (useResultSizeBytes) { 164 readDiff = resultSize - readConsumed; 165 } else { 166 long blockBytesScanned = 167 RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); 168 readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; 169 } 170 writeCapacityUnitDiff = 171 calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); 172 readCapacityUnitDiff = calculateReadCapacityUnitDiff( 173 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], 174 readConsumed); 175 handlerUsageTimeDiff = calculateHandlerUsageMsDiff(); 176 177 for (final QuotaLimiter limiter : limiters) { 178 if (writeDiff != 0) { 179 limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic); 180 } 181 if (readDiff != 0) { 182 limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic); 183 } 184 if (handlerUsageTimeDiff != 0) { 185 limiter.consumeTime(handlerUsageTimeDiff); 186 } 187 } 188 } 189 190 @Override 191 public long getReadAvailable() { 192 return readAvailable; 193 } 194 195 @Override 196 public long getReadConsumed() { 197 return readConsumed; 198 } 199 200 @Override 201 public void addGetResult(final Result result) { 202 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); 203 } 204 205 @Override 206 public void addScanResult(final List<Result> results) { 207 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); 208 } 209 210 @Override 211 public void addScanResultCells(final List<Cell> cells) { 212 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells); 213 } 214 215 @Override 216 public void addMutation(final Mutation mutation) { 217 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); 218 } 219 220 /** 221 * Update estimate quota(read/write size/capacityUnits) which will be consumed 222 * @param numWrites the number of write requests 223 * @param numReads the number of read requests 224 */ 225 protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { 226 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); 227 228 if (useResultSizeBytes) { 229 readConsumed = estimateConsume(OperationType.GET, numReads, 100); 230 } else { 231 // assume 1 block required for reads. this is probably a low estimate, which is okay 232 readConsumed = numReads > 0 ? blockSizeBytes : 0; 233 } 234 235 writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); 236 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 237 238 handlerUsageTimeConsumed = (numReads + numWrites) * estimatedHandlerUsagePerReq; 239 } 240 241 /** 242 * Update estimate quota(read/write size/capacityUnits) which will be consumed 243 * @param scanRequest the scan to be executed 244 * @param maxScannerResultSize the maximum bytes to be returned by the scanner 245 * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the 246 * scanner 247 * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next 248 * calls 249 */ 250 protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, 251 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 252 if (useResultSizeBytes) { 253 readConsumed = estimateConsume(OperationType.SCAN, 1, 1000); 254 } else { 255 long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(), 256 maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); 257 readConsumed = Math.min(maxScanEstimate, estimate); 258 } 259 260 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 261 handlerUsageTimeConsumed = estimatedHandlerUsagePerReq; 262 } 263 264 protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, 265 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 266 /* 267 * Estimating scan workload is more complicated, and if we severely underestimate workloads then 268 * throttled clients will exhaust retries too quickly, and could saturate the RPC layer 269 */ 270 if (nextCallSeq == 0) { 271 // start scanners with an optimistic 1 block IO estimate 272 // it is better to underestimate a large scan in the beginning 273 // than to overestimate, and block, a small scan 274 return blockSizeBytes; 275 } 276 277 boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes; 278 if (isWorkloadGrowing) { 279 // if nextCallSeq > 0 and the workload is growing then our estimate 280 // should consider that the workload may continue to increase 281 return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned); 282 } else { 283 // if nextCallSeq > 0 and the workload is shrinking or flat 284 // then our workload has likely plateaued. We can just rely on the existing 285 // maxBlockBytesScanned as our estimate in this case. 286 return maxBlockBytesScanned; 287 } 288 } 289 290 private long estimateConsume(final OperationType type, int numReqs, long avgSize) { 291 if (numReqs > 0) { 292 return avgSize * numReqs; 293 } 294 return 0; 295 } 296 297 private long calculateWriteCapacityUnit(final long size) { 298 return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); 299 } 300 301 private long calculateReadCapacityUnit(final long size) { 302 return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); 303 } 304 305 private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { 306 return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); 307 } 308 309 private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { 310 return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); 311 } 312 313 private long calculateHandlerUsageTimeEstimate(final double requestsPerSecond, 314 final int numHandlerThreads) { 315 if (requestsPerSecond <= numHandlerThreads) { 316 // If less than 1 request per second per handler thread, then we use the number of handler 317 // threads as a baseline to avoid incorrect estimations when the number of requests is very 318 // low. 319 return numHandlerThreads; 320 } else { 321 double requestsPerMillisecond = Math.ceil(requestsPerSecond / 1000); 322 // We don't ever want zero here 323 return Math.max((long) requestsPerMillisecond, 1L); 324 } 325 } 326 327 private long calculateHandlerUsageMsDiff() { 328 long currentTime = EnvironmentEdgeManager.currentTime(); 329 long startTime = RpcServer.getCurrentCall().map(RpcCall::getStartTime).orElse(currentTime); 330 long timeElapsed = currentTime - startTime; 331 return handlerUsageTimeConsumed - timeElapsed; 332 } 333}