1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.io.IOException;
15 import java.util.List;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.apache.hadoop.hbase.TableName;
20 import org.apache.hadoop.hbase.classification.InterfaceAudience;
21 import org.apache.hadoop.hbase.classification.InterfaceStability;
22 import org.apache.hadoop.hbase.ipc.RpcScheduler;
23 import org.apache.hadoop.hbase.ipc.RpcServer;
24 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
25 import org.apache.hadoop.hbase.regionserver.Region;
26 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
27 import org.apache.hadoop.hbase.security.User;
28 import org.apache.hadoop.security.UserGroupInformation;
29
30 import com.google.common.annotations.VisibleForTesting;
31
32
33
34
35
36
37
38 @InterfaceAudience.Private
39 @InterfaceStability.Evolving
40 public class RegionServerQuotaManager {
41 private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
42
43 private final RegionServerServices rsServices;
44
45 private QuotaCache quotaCache = null;
46
47 public RegionServerQuotaManager(final RegionServerServices rsServices) {
48 this.rsServices = rsServices;
49 }
50
51 public void start(final RpcScheduler rpcScheduler) throws IOException {
52 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
53 LOG.info("Quota support disabled");
54 return;
55 }
56
57 LOG.info("Initializing quota support");
58
59
60 quotaCache = new QuotaCache(rsServices);
61 quotaCache.start();
62 }
63
64 public void stop() {
65 if (isQuotaEnabled()) {
66 quotaCache.stop("shutdown");
67 }
68 }
69
70 public boolean isQuotaEnabled() {
71 return quotaCache != null;
72 }
73
74 @VisibleForTesting
75 QuotaCache getQuotaCache() {
76 return quotaCache;
77 }
78
79
80
81
82
83
84
85 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
86 if (isQuotaEnabled() && !table.isSystemTable()) {
87 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
88 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
89 boolean useNoop = userLimiter.isBypass();
90 if (userQuotaState.hasBypassGlobals()) {
91 if (LOG.isTraceEnabled()) {
92 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
93 }
94 if (!useNoop) {
95 return new DefaultOperationQuota(userLimiter);
96 }
97 } else {
98 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
99 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
100 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
101 if (LOG.isTraceEnabled()) {
102 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
103 + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
104 }
105 if (!useNoop) {
106 return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
107 }
108 }
109 }
110 return NoopOperationQuota.get();
111 }
112
113
114
115
116
117
118
119
120
121 public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
122 throws IOException, ThrottlingException {
123 switch (type) {
124 case SCAN:
125 return checkQuota(region, 0, 0, 1);
126 case GET:
127 return checkQuota(region, 0, 1, 0);
128 case MUTATE:
129 return checkQuota(region, 1, 0, 0);
130 default:
131 throw new RuntimeException("Invalid operation type: " + type);
132 }
133 }
134
135
136
137
138
139
140
141
142
143 public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
144 throws IOException, ThrottlingException {
145 int numWrites = 0;
146 int numReads = 0;
147 for (final ClientProtos.Action action : actions) {
148 if (action.hasMutation()) {
149 numWrites++;
150 } else if (action.hasGet()) {
151 numReads++;
152 }
153 }
154 return checkQuota(region, numWrites, numReads, 0);
155 }
156
157
158
159
160
161
162
163
164
165
166
167 private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads,
168 final int numScans) throws IOException, ThrottlingException {
169 User user = RpcServer.getRequestUser();
170 UserGroupInformation ugi;
171 if (user != null) {
172 ugi = user.getUGI();
173 } else {
174 ugi = User.getCurrent().getUGI();
175 }
176 TableName table = region.getTableDesc().getTableName();
177
178 OperationQuota quota = getQuota(ugi, table);
179 try {
180 quota.checkQuota(numWrites, numReads, numScans);
181 } catch (ThrottlingException e) {
182 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
183 + " numWrites=" + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": "
184 + e.getMessage());
185 throw e;
186 }
187 return quota;
188 }
189 }