001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.util.Arrays; 022import java.util.List; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hadoop.hbase.client.Mutation; 030import org.apache.hadoop.hbase.client.Result; 031 032@InterfaceAudience.Private 033@InterfaceStability.Evolving 034public class DefaultOperationQuota implements OperationQuota { 035 private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class); 036 037 protected final List<QuotaLimiter> limiters; 038 private final long writeCapacityUnit; 039 private final long readCapacityUnit; 040 041 // the available read/write quota size in bytes 042 protected long writeAvailable = 0; 043 protected long readAvailable = 0; 044 // estimated quota 045 protected long writeConsumed = 0; 046 protected long readConsumed = 0; 047 protected long writeCapacityUnitConsumed = 0; 048 protected long readCapacityUnitConsumed = 0; 049 // real consumed quota 050 private final long[] operationSize; 051 // difference between estimated quota and real consumed quota used in close method 052 // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass 053 // of DefaultOperationQuota 054 protected long writeDiff = 0; 055 protected long readDiff = 0; 056 protected long writeCapacityUnitDiff = 0; 057 protected long readCapacityUnitDiff = 0; 058 059 public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) { 060 this(conf, Arrays.asList(limiters)); 061 } 062 063 /** 064 * NOTE: The order matters. It should be something like [user, table, namespace, global] 065 */ 066 public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) { 067 this.writeCapacityUnit = 068 conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); 069 this.readCapacityUnit = 070 conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); 071 this.limiters = limiters; 072 int size = OperationType.values().length; 073 operationSize = new long[size]; 074 075 for (int i = 0; i < size; ++i) { 076 operationSize[i] = 0; 077 } 078 } 079 080 @Override 081 public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { 082 updateEstimateConsumeQuota(numWrites, numReads, numScans); 083 084 writeAvailable = Long.MAX_VALUE; 085 readAvailable = Long.MAX_VALUE; 086 for (final QuotaLimiter limiter : limiters) { 087 if (limiter.isBypass()) continue; 088 089 limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, 090 writeCapacityUnitConsumed, readCapacityUnitConsumed); 091 readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); 092 writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); 093 } 094 095 for (final QuotaLimiter limiter : limiters) { 096 limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, 097 writeCapacityUnitConsumed, readCapacityUnitConsumed); 098 } 099 } 100 101 @Override 102 public void close() { 103 // Adjust the quota consumed for the specified operation 104 writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; 105 readDiff = operationSize[OperationType.GET.ordinal()] 106 + operationSize[OperationType.SCAN.ordinal()] - readConsumed; 107 writeCapacityUnitDiff = calculateWriteCapacityUnitDiff( 108 operationSize[OperationType.MUTATE.ordinal()], writeConsumed); 109 readCapacityUnitDiff = calculateReadCapacityUnitDiff( 110 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], 111 readConsumed); 112 113 for (final QuotaLimiter limiter : limiters) { 114 if (writeDiff != 0) { 115 limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); 116 } 117 if (readDiff != 0) { 118 limiter.consumeRead(readDiff, readCapacityUnitDiff); 119 } 120 } 121 } 122 123 @Override 124 public long getReadAvailable() { 125 return readAvailable; 126 } 127 128 @Override 129 public long getWriteAvailable() { 130 return writeAvailable; 131 } 132 133 @Override 134 public void addGetResult(final Result result) { 135 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); 136 } 137 138 @Override 139 public void addScanResult(final List<Result> results) { 140 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); 141 } 142 143 @Override 144 public void addMutation(final Mutation mutation) { 145 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); 146 } 147 148 /** 149 * Update estimate quota(read/write size/capacityUnits) which will be consumed 150 * @param numWrites the number of write requests 151 * @param numReads the number of read requests 152 * @param numScans the number of scan requests 153 */ 154 protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { 155 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); 156 readConsumed = estimateConsume(OperationType.GET, numReads, 100); 157 readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); 158 159 writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); 160 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 161 } 162 163 private long estimateConsume(final OperationType type, int numReqs, long avgSize) { 164 if (numReqs > 0) { 165 return avgSize * numReqs; 166 } 167 return 0; 168 } 169 170 private long calculateWriteCapacityUnit(final long size) { 171 return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); 172 } 173 174 private long calculateReadCapacityUnit(final long size) { 175 return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); 176 } 177 178 private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { 179 return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); 180 } 181 182 private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { 183 return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); 184 } 185}