1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.util.Arrays;
15 import java.util.List;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.apache.hadoop.hbase.classification.InterfaceAudience;
20 import org.apache.hadoop.hbase.classification.InterfaceStability;
21 import org.apache.hadoop.hbase.client.Mutation;
22 import org.apache.hadoop.hbase.client.Result;
23
24 @InterfaceAudience.Private
25 @InterfaceStability.Evolving
26 public class DefaultOperationQuota implements OperationQuota {
27 private static final Log LOG = LogFactory.getLog(DefaultOperationQuota.class);
28
29 private final List<QuotaLimiter> limiters;
30 private long writeAvailable = 0;
31 private long readAvailable = 0;
32 private long writeConsumed = 0;
33 private long readConsumed = 0;
34 private final long[] operationSize;
35
36 public DefaultOperationQuota(final QuotaLimiter... limiters) {
37 this(Arrays.asList(limiters));
38 }
39
40
41
42
43 public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
44 this.limiters = limiters;
45 int size = OperationType.values().length;
46 operationSize = new long[size];
47
48 for (int i = 0; i < size; ++i) {
49 operationSize[i] = 0;
50 }
51 }
52
53 @Override
54 public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException {
55 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
56 readConsumed = estimateConsume(OperationType.GET, numReads, 100);
57 readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
58
59 writeAvailable = Long.MAX_VALUE;
60 readAvailable = Long.MAX_VALUE;
61 for (final QuotaLimiter limiter : limiters) {
62 if (limiter.isBypass()) continue;
63
64 limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
65 readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
66 writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
67 }
68
69 for (final QuotaLimiter limiter : limiters) {
70 limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
71 }
72 }
73
74 @Override
75 public void close() {
76
77 long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
78 long readDiff = operationSize[OperationType.GET.ordinal()] +
79 operationSize[OperationType.SCAN.ordinal()] - readConsumed;
80
81 for (final QuotaLimiter limiter: limiters) {
82 if (writeDiff != 0) limiter.consumeWrite(writeDiff);
83 if (readDiff != 0) limiter.consumeRead(readDiff);
84 }
85 }
86
87 @Override
88 public long getReadAvailable() {
89 return readAvailable;
90 }
91
92 @Override
93 public long getWriteAvailable() {
94 return writeAvailable;
95 }
96
97 @Override
98 public void addGetResult(final Result result) {
99 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
100 }
101
102 @Override
103 public void addScanResult(final List<Result> results) {
104 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
105 }
106
107 @Override
108 public void addMutation(final Mutation mutation) {
109 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
110 }
111
112 private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
113 if (numReqs > 0) {
114 return avgSize * numReqs;
115 }
116 return 0;
117 }
118 }