001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.Arrays; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.client.Mutation; 024import org.apache.hadoop.hbase.client.Result; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027 028@InterfaceAudience.Private 029@InterfaceStability.Evolving 030public class DefaultOperationQuota implements OperationQuota { 031 032 protected final List<QuotaLimiter> limiters; 033 private final long writeCapacityUnit; 034 private final long readCapacityUnit; 035 036 // the available read/write quota size in bytes 037 protected long readAvailable = 0; 038 // estimated quota 039 protected long writeConsumed = 0; 040 protected long readConsumed = 0; 041 protected long writeCapacityUnitConsumed = 0; 042 protected long readCapacityUnitConsumed = 0; 043 // real consumed quota 044 private final long[] operationSize; 045 // difference between estimated quota and real consumed quota used in close method 046 // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass 047 // of DefaultOperationQuota 048 protected long writeDiff = 0; 049 protected long readDiff = 0; 050 protected long writeCapacityUnitDiff = 0; 051 protected long readCapacityUnitDiff = 0; 052 053 public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) { 054 this(conf, Arrays.asList(limiters)); 055 } 056 057 /** 058 * NOTE: The order matters. It should be something like [user, table, namespace, global] 059 */ 060 public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) { 061 this.writeCapacityUnit = 062 conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); 063 this.readCapacityUnit = 064 conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); 065 this.limiters = limiters; 066 int size = OperationType.values().length; 067 operationSize = new long[size]; 068 069 for (int i = 0; i < size; ++i) { 070 operationSize[i] = 0; 071 } 072 } 073 074 @Override 075 public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { 076 updateEstimateConsumeQuota(numWrites, numReads, numScans); 077 078 readAvailable = Long.MAX_VALUE; 079 for (final QuotaLimiter limiter : limiters) { 080 if (limiter.isBypass()) continue; 081 082 limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, 083 writeCapacityUnitConsumed, readCapacityUnitConsumed); 084 readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); 085 } 086 087 for (final QuotaLimiter limiter : limiters) { 088 limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, 089 writeCapacityUnitConsumed, readCapacityUnitConsumed); 090 } 091 } 092 093 @Override 094 public void close() { 095 // Adjust the quota consumed for the specified operation 096 writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; 097 readDiff = operationSize[OperationType.GET.ordinal()] 098 + operationSize[OperationType.SCAN.ordinal()] - readConsumed; 099 writeCapacityUnitDiff = 100 calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); 101 readCapacityUnitDiff = calculateReadCapacityUnitDiff( 102 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], 103 readConsumed); 104 105 for (final QuotaLimiter limiter : limiters) { 106 if (writeDiff != 0) { 107 limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); 108 } 109 if (readDiff != 0) { 110 limiter.consumeRead(readDiff, readCapacityUnitDiff); 111 } 112 } 113 } 114 115 @Override 116 public long getReadAvailable() { 117 return readAvailable; 118 } 119 120 @Override 121 public void addGetResult(final Result result) { 122 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); 123 } 124 125 @Override 126 public void addScanResult(final List<Result> results) { 127 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); 128 } 129 130 @Override 131 public void addMutation(final Mutation mutation) { 132 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); 133 } 134 135 /** 136 * Update estimate quota(read/write size/capacityUnits) which will be consumed 137 * @param numWrites the number of write requests 138 * @param numReads the number of read requests 139 * @param numScans the number of scan requests 140 */ 141 protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { 142 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); 143 readConsumed = estimateConsume(OperationType.GET, numReads, 100); 144 readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); 145 146 writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); 147 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 148 } 149 150 private long estimateConsume(final OperationType type, int numReqs, long avgSize) { 151 if (numReqs > 0) { 152 return avgSize * numReqs; 153 } 154 return 0; 155 } 156 157 private long calculateWriteCapacityUnit(final long size) { 158 return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); 159 } 160 161 private long calculateReadCapacityUnit(final long size) { 162 return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); 163 } 164 165 private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { 166 return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); 167 } 168 169 private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { 170 return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); 171 } 172}