001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.Arrays; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.Cell; 024import org.apache.hadoop.hbase.client.Mutation; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.ipc.RpcCall; 027import org.apache.hadoop.hbase.ipc.RpcServer; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030 031import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 032 033@InterfaceAudience.Private 034@InterfaceStability.Evolving 035public class DefaultOperationQuota implements OperationQuota { 036 037 // a single scan estimate can consume no more than this proportion of the limiter's limit 038 // this prevents a long-running scan from being estimated at, say, 100MB of IO against 039 // a <100MB/IO throttle (because this would never succeed) 040 private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9; 041 042 protected final List<QuotaLimiter> limiters; 043 private final long writeCapacityUnit; 044 private final long readCapacityUnit; 045 046 // the available read/write quota size in bytes 047 protected long readAvailable = 0; 048 // estimated quota 049 protected long writeConsumed = 0; 050 protected long readConsumed = 0; 051 protected long writeCapacityUnitConsumed = 0; 052 protected long readCapacityUnitConsumed = 0; 053 // real consumed quota 054 private final long[] operationSize; 055 // difference between estimated quota and real consumed quota used in close method 056 // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass 057 // of DefaultOperationQuota 058 protected long writeDiff = 0; 059 protected long readDiff = 0; 060 protected long writeCapacityUnitDiff = 0; 061 protected long readCapacityUnitDiff = 0; 062 private boolean useResultSizeBytes; 063 private long blockSizeBytes; 064 private long maxScanEstimate; 065 066 public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, 067 final QuotaLimiter... limiters) { 068 this(conf, Arrays.asList(limiters)); 069 this.useResultSizeBytes = 070 conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); 071 this.blockSizeBytes = blockSizeBytes; 072 long readSizeLimit = 073 Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE); 074 maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); 075 } 076 077 /** 078 * NOTE: The order matters. It should be something like [user, table, namespace, global] 079 */ 080 public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) { 081 this.writeCapacityUnit = 082 conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); 083 this.readCapacityUnit = 084 conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); 085 this.limiters = limiters; 086 int size = OperationType.values().length; 087 operationSize = new long[size]; 088 089 for (int i = 0; i < size; ++i) { 090 operationSize[i] = 0; 091 } 092 } 093 094 @Override 095 public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { 096 updateEstimateConsumeBatchQuota(numWrites, numReads); 097 checkQuota(numWrites, numReads); 098 } 099 100 @Override 101 public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 102 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { 103 updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 104 prevBlockBytesScannedDifference); 105 checkQuota(0, 1); 106 } 107 108 private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException { 109 readAvailable = Long.MAX_VALUE; 110 for (final QuotaLimiter limiter : limiters) { 111 if (limiter.isBypass()) { 112 continue; 113 } 114 115 long maxRequestsToEstimate = limiter.getRequestNumLimit(); 116 long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); 117 long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); 118 long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit()); 119 long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit()); 120 121 limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), 122 Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), 123 Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, 124 readCapacityUnitConsumed); 125 readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); 126 } 127 128 for (final QuotaLimiter limiter : limiters) { 129 limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, 130 readCapacityUnitConsumed); 131 } 132 } 133 134 @Override 135 public void close() { 136 // Adjust the quota consumed for the specified operation 137 writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; 138 139 long resultSize = 140 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; 141 if (useResultSizeBytes) { 142 readDiff = resultSize - readConsumed; 143 } else { 144 long blockBytesScanned = 145 RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); 146 readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; 147 } 148 149 writeCapacityUnitDiff = 150 calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); 151 readCapacityUnitDiff = calculateReadCapacityUnitDiff( 152 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], 153 readConsumed); 154 155 for (final QuotaLimiter limiter : limiters) { 156 if (writeDiff != 0) { 157 limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); 158 } 159 if (readDiff != 0) { 160 limiter.consumeRead(readDiff, readCapacityUnitDiff); 161 } 162 } 163 } 164 165 @Override 166 public long getReadAvailable() { 167 return readAvailable; 168 } 169 170 @Override 171 public long getReadConsumed() { 172 return readConsumed; 173 } 174 175 @Override 176 public void addGetResult(final Result result) { 177 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); 178 } 179 180 @Override 181 public void addScanResult(final List<Result> results) { 182 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); 183 } 184 185 @Override 186 public void addScanResultCells(final List<Cell> cells) { 187 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells); 188 } 189 190 @Override 191 public void addMutation(final Mutation mutation) { 192 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); 193 } 194 195 /** 196 * Update estimate quota(read/write size/capacityUnits) which will be consumed 197 * @param numWrites the number of write requests 198 * @param numReads the number of read requests 199 */ 200 protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { 201 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); 202 203 if (useResultSizeBytes) { 204 readConsumed = estimateConsume(OperationType.GET, numReads, 100); 205 } else { 206 // assume 1 block required for reads. this is probably a low estimate, which is okay 207 readConsumed = numReads > 0 ? blockSizeBytes : 0; 208 } 209 210 writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); 211 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 212 } 213 214 /** 215 * Update estimate quota(read/write size/capacityUnits) which will be consumed 216 * @param scanRequest the scan to be executed 217 * @param maxScannerResultSize the maximum bytes to be returned by the scanner 218 * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the 219 * scanner 220 * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next 221 * calls 222 */ 223 protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, 224 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 225 if (useResultSizeBytes) { 226 readConsumed = estimateConsume(OperationType.SCAN, 1, 1000); 227 } else { 228 long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(), 229 maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); 230 readConsumed = Math.min(maxScanEstimate, estimate); 231 } 232 233 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 234 } 235 236 protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, 237 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 238 /* 239 * Estimating scan workload is more complicated, and if we severely underestimate workloads then 240 * throttled clients will exhaust retries too quickly, and could saturate the RPC layer 241 */ 242 if (nextCallSeq == 0) { 243 // start scanners with an optimistic 1 block IO estimate 244 // it is better to underestimate a large scan in the beginning 245 // than to overestimate, and block, a small scan 246 return blockSizeBytes; 247 } 248 249 boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes; 250 if (isWorkloadGrowing) { 251 // if nextCallSeq > 0 and the workload is growing then our estimate 252 // should consider that the workload may continue to increase 253 return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned); 254 } else { 255 // if nextCallSeq > 0 and the workload is shrinking or flat 256 // then our workload has likely plateaued. We can just rely on the existing 257 // maxBlockBytesScanned as our estimate in this case. 258 return maxBlockBytesScanned; 259 } 260 } 261 262 private long estimateConsume(final OperationType type, int numReqs, long avgSize) { 263 if (numReqs > 0) { 264 return avgSize * numReqs; 265 } 266 return 0; 267 } 268 269 private long calculateWriteCapacityUnit(final long size) { 270 return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); 271 } 272 273 private long calculateReadCapacityUnit(final long size) { 274 return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); 275 } 276 277 private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { 278 return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); 279 } 280 281 private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { 282 return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); 283 } 284}