1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.util.Arrays;
15 import java.util.List;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.apache.hadoop.hbase.classification.InterfaceAudience;
20 import org.apache.hadoop.hbase.classification.InterfaceStability;
21 import org.apache.hadoop.hbase.client.Mutation;
22 import org.apache.hadoop.hbase.client.Result;
23
24 @InterfaceAudience.Private
25 @InterfaceStability.Evolving
26 public class DefaultOperationQuota implements OperationQuota {
27 private static final Log LOG = LogFactory.getLog(DefaultOperationQuota.class);
28
29 private final List<QuotaLimiter> limiters;
30 private long writeAvailable = 0;
31 private long readAvailable = 0;
32 private long writeConsumed = 0;
33 private long readConsumed = 0;
34
35 private AvgOperationSize avgOpSize = new AvgOperationSize();
36
37 public DefaultOperationQuota(final QuotaLimiter... limiters) {
38 this(Arrays.asList(limiters));
39 }
40
41
42
43
44 public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
45 this.limiters = limiters;
46 }
47
48 @Override
49 public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException {
50 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
51 readConsumed = estimateConsume(OperationType.GET, numReads, 100);
52 readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
53
54 writeAvailable = Long.MAX_VALUE;
55 readAvailable = Long.MAX_VALUE;
56 for (final QuotaLimiter limiter : limiters) {
57 if (limiter.isBypass()) continue;
58
59 limiter.checkQuota(writeConsumed, readConsumed);
60 readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
61 writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
62 }
63
64 for (final QuotaLimiter limiter : limiters) {
65 limiter.grabQuota(writeConsumed, readConsumed);
66 }
67 }
68
69 @Override
70 public void close() {
71
72 long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
73 long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
74 long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
75 for (final QuotaLimiter limiter : limiters) {
76 limiter.addOperationSize(OperationType.GET, getSize);
77 limiter.addOperationSize(OperationType.SCAN, scanSize);
78 limiter.addOperationSize(OperationType.MUTATE, mutationSize);
79 }
80
81
82 long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
83 long readDiff =
84 (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize
85 .getOperationSize(OperationType.SCAN)) - readConsumed;
86 for (final QuotaLimiter limiter : limiters) {
87 if (writeDiff != 0) limiter.consumeWrite(writeDiff);
88 if (readDiff != 0) limiter.consumeRead(readDiff);
89 }
90 }
91
92 @Override
93 public long getReadAvailable() {
94 return readAvailable;
95 }
96
97 @Override
98 public long getWriteAvailable() {
99 return writeAvailable;
100 }
101
102 @Override
103 public void addGetResult(final Result result) {
104 avgOpSize.addGetResult(result);
105 }
106
107 @Override
108 public void addScanResult(final List<Result> results) {
109 avgOpSize.addScanResult(results);
110 }
111
112 @Override
113 public void addMutation(final Mutation mutation) {
114 avgOpSize.addMutation(mutation);
115 }
116
117 @Override
118 public long getAvgOperationSize(OperationType type) {
119 return avgOpSize.getAvgOperationSize(type);
120 }
121
122 private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
123 if (numReqs > 0) {
124 for (final QuotaLimiter limiter : limiters) {
125 long size = limiter.getAvgOperationSize(type);
126 if (size > 0) {
127 avgSize = size;
128 break;
129 }
130 }
131 return avgSize * numReqs;
132 }
133 return 0;
134 }
135 }