1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.io.IOException;
15 import java.util.HashSet;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.apache.hadoop.hbase.DoNotRetryIOException;
20 import org.apache.hadoop.hbase.HRegionInfo;
21 import org.apache.hadoop.hbase.MetaTableAccessor;
22 import org.apache.hadoop.hbase.NamespaceDescriptor;
23 import org.apache.hadoop.hbase.RegionStateListener;
24 import org.apache.hadoop.hbase.TableName;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.master.MasterServices;
28 import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
29 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
30 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
31 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
32 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
33 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
34 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
35 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.ThrottleRequest;
36 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
37 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38
39
40
41
42
43
44
45 @InterfaceAudience.Private
46 @InterfaceStability.Evolving
47 public class MasterQuotaManager implements RegionStateListener {
48 private static final Log LOG = LogFactory.getLog(MasterQuotaManager.class);
49
50 private final MasterServices masterServices;
51 private NamedLock<String> namespaceLocks;
52 private NamedLock<TableName> tableLocks;
53 private NamedLock<String> userLocks;
54 private boolean initialized = false;
55 private NamespaceAuditor namespaceQuotaManager;
56
57 public MasterQuotaManager(final MasterServices masterServices) {
58 this.masterServices = masterServices;
59 }
60
61 public void start() throws IOException {
62
63 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
64 LOG.info("Quota support disabled");
65 return;
66 }
67
68
69 if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
70 QuotaUtil.QUOTA_TABLE_NAME)) {
71 LOG.info("Quota table not found. Creating...");
72 createQuotaTable();
73 }
74
75 LOG.info("Initializing quota support");
76 namespaceLocks = new NamedLock<String>();
77 tableLocks = new NamedLock<TableName>();
78 userLocks = new NamedLock<String>();
79
80 namespaceQuotaManager = new NamespaceAuditor(masterServices);
81 namespaceQuotaManager.start();
82 initialized = true;
83 }
84
85 public void stop() {
86 }
87
88 public boolean isQuotaInitialized() {
89 return initialized && namespaceQuotaManager.isInitialized();
90 }
91
92
93
94
95
96 public SetQuotaResponse setQuota(final SetQuotaRequest req) throws IOException,
97 InterruptedException {
98 checkQuotaSupport();
99
100 if (req.hasUserName()) {
101 userLocks.lock(req.getUserName());
102 try {
103 if (req.hasTableName()) {
104 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
105 } else if (req.hasNamespace()) {
106 setUserQuota(req.getUserName(), req.getNamespace(), req);
107 } else {
108 setUserQuota(req.getUserName(), req);
109 }
110 } finally {
111 userLocks.unlock(req.getUserName());
112 }
113 } else if (req.hasTableName()) {
114 TableName table = ProtobufUtil.toTableName(req.getTableName());
115 tableLocks.lock(table);
116 try {
117 setTableQuota(table, req);
118 } finally {
119 tableLocks.unlock(table);
120 }
121 } else if (req.hasNamespace()) {
122 namespaceLocks.lock(req.getNamespace());
123 try {
124 setNamespaceQuota(req.getNamespace(), req);
125 } finally {
126 namespaceLocks.unlock(req.getNamespace());
127 }
128 } else {
129 throw new DoNotRetryIOException(new UnsupportedOperationException(
130 "a user, a table or a namespace must be specified"));
131 }
132 return SetQuotaResponse.newBuilder().build();
133 }
134
135 public void setUserQuota(final String userName, final SetQuotaRequest req) throws IOException,
136 InterruptedException {
137 setQuota(req, new SetQuotaOperations() {
138 @Override
139 public Quotas fetch() throws IOException {
140 return QuotaUtil.getUserQuota(masterServices.getConnection(), userName);
141 }
142
143 @Override
144 public void update(final Quotas quotas) throws IOException {
145 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotas);
146 }
147
148 @Override
149 public void delete() throws IOException {
150 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
151 }
152
153 @Override
154 public void preApply(final Quotas quotas) throws IOException {
155 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotas);
156 }
157
158 @Override
159 public void postApply(final Quotas quotas) throws IOException {
160 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotas);
161 }
162 });
163 }
164
165 public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req)
166 throws IOException, InterruptedException {
167 setQuota(req, new SetQuotaOperations() {
168 @Override
169 public Quotas fetch() throws IOException {
170 return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table);
171 }
172
173 @Override
174 public void update(final Quotas quotas) throws IOException {
175 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, quotas);
176 }
177
178 @Override
179 public void delete() throws IOException {
180 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
181 }
182
183 @Override
184 public void preApply(final Quotas quotas) throws IOException {
185 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotas);
186 }
187
188 @Override
189 public void postApply(final Quotas quotas) throws IOException {
190 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotas);
191 }
192 });
193 }
194
195 public void
196 setUserQuota(final String userName, final String namespace, final SetQuotaRequest req)
197 throws IOException, InterruptedException {
198 setQuota(req, new SetQuotaOperations() {
199 @Override
200 public Quotas fetch() throws IOException {
201 return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace);
202 }
203
204 @Override
205 public void update(final Quotas quotas) throws IOException {
206 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, quotas);
207 }
208
209 @Override
210 public void delete() throws IOException {
211 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
212 }
213
214 @Override
215 public void preApply(final Quotas quotas) throws IOException {
216 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotas);
217 }
218
219 @Override
220 public void postApply(final Quotas quotas) throws IOException {
221 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotas);
222 }
223 });
224 }
225
226 public void setTableQuota(final TableName table, final SetQuotaRequest req) throws IOException,
227 InterruptedException {
228 setQuota(req, new SetQuotaOperations() {
229 @Override
230 public Quotas fetch() throws IOException {
231 return QuotaUtil.getTableQuota(masterServices.getConnection(), table);
232 }
233
234 @Override
235 public void update(final Quotas quotas) throws IOException {
236 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotas);
237 }
238
239 @Override
240 public void delete() throws IOException {
241 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
242 }
243
244 @Override
245 public void preApply(final Quotas quotas) throws IOException {
246 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotas);
247 }
248
249 @Override
250 public void postApply(final Quotas quotas) throws IOException {
251 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotas);
252 }
253 });
254 }
255
256 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
257 throws IOException, InterruptedException {
258 setQuota(req, new SetQuotaOperations() {
259 @Override
260 public Quotas fetch() throws IOException {
261 return QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace);
262 }
263
264 @Override
265 public void update(final Quotas quotas) throws IOException {
266 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, quotas);
267 }
268
269 @Override
270 public void delete() throws IOException {
271 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
272 }
273
274 @Override
275 public void preApply(final Quotas quotas) throws IOException {
276 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotas);
277 }
278
279 @Override
280 public void postApply(final Quotas quotas) throws IOException {
281 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotas);
282 }
283 });
284 }
285
286 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
287 if (initialized) {
288 this.namespaceQuotaManager.addNamespace(desc);
289 }
290 }
291
292 public void removeNamespaceQuota(String namespace) throws IOException {
293 if (initialized) {
294 this.namespaceQuotaManager.deleteNamespace(namespace);
295 }
296 }
297
298 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
299 throws IOException, InterruptedException {
300 if (req.hasRemoveAll() && req.getRemoveAll() == true) {
301 quotaOps.preApply(null);
302 quotaOps.delete();
303 quotaOps.postApply(null);
304 return;
305 }
306
307
308 Quotas quotas = quotaOps.fetch();
309 quotaOps.preApply(quotas);
310
311 Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder();
312 if (req.hasThrottle()) applyThrottle(builder, req.getThrottle());
313 if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals());
314
315
316 quotas = builder.build();
317 if (QuotaUtil.isEmptyQuota(quotas)) {
318 quotaOps.delete();
319 } else {
320 quotaOps.update(quotas);
321 }
322 quotaOps.postApply(quotas);
323 }
324
325 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
326 if (initialized) {
327 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
328 }
329 }
330
331 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
332 if (initialized) {
333 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
334 }
335 }
336
337
338
339
340 public int getRegionCountOfTable(TableName tName) throws IOException {
341 if (initialized) {
342 return namespaceQuotaManager.getRegionCountOfTable(tName);
343 }
344 return -1;
345 }
346
347 public void onRegionMerged(HRegionInfo hri) throws IOException {
348 if (initialized) {
349 namespaceQuotaManager.updateQuotaForRegionMerge(hri);
350 }
351 }
352
353 public void onRegionSplit(HRegionInfo hri) throws IOException {
354 if (initialized) {
355 namespaceQuotaManager.checkQuotaToSplitRegion(hri);
356 }
357 }
358
359
360
361
362
363
364 public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
365 if (initialized) {
366 namespaceQuotaManager.removeFromNamespaceUsage(tName);
367 }
368 }
369
370 public NamespaceAuditor getNamespaceQuotaManager() {
371 return this.namespaceQuotaManager;
372 }
373
374 private static interface SetQuotaOperations {
375 Quotas fetch() throws IOException;
376
377 void delete() throws IOException;
378
379 void update(final Quotas quotas) throws IOException;
380
381 void preApply(final Quotas quotas) throws IOException;
382
383 void postApply(final Quotas quotas) throws IOException;
384 }
385
386
387
388
389
390 private void applyThrottle(final Quotas.Builder quotas, final ThrottleRequest req)
391 throws IOException {
392 Throttle.Builder throttle;
393
394 if (req.hasType() && (req.hasTimedQuota() || quotas.hasThrottle())) {
395
396 if (req.hasTimedQuota()) {
397 validateTimedQuota(req.getTimedQuota());
398 }
399
400
401 throttle = quotas.hasThrottle() ? quotas.getThrottle().toBuilder() : Throttle.newBuilder();
402
403 switch (req.getType()) {
404 case REQUEST_NUMBER:
405 if (req.hasTimedQuota()) {
406 throttle.setReqNum(req.getTimedQuota());
407 } else {
408 throttle.clearReqNum();
409 }
410 break;
411 case REQUEST_SIZE:
412 if (req.hasTimedQuota()) {
413 throttle.setReqSize(req.getTimedQuota());
414 } else {
415 throttle.clearReqSize();
416 }
417 break;
418 case WRITE_NUMBER:
419 if (req.hasTimedQuota()) {
420 throttle.setWriteNum(req.getTimedQuota());
421 } else {
422 throttle.clearWriteNum();
423 }
424 break;
425 case WRITE_SIZE:
426 if (req.hasTimedQuota()) {
427 throttle.setWriteSize(req.getTimedQuota());
428 } else {
429 throttle.clearWriteSize();
430 }
431 break;
432 case READ_NUMBER:
433 if (req.hasTimedQuota()) {
434 throttle.setReadNum(req.getTimedQuota());
435 } else {
436 throttle.clearReqNum();
437 }
438 break;
439 case READ_SIZE:
440 if (req.hasTimedQuota()) {
441 throttle.setReadSize(req.getTimedQuota());
442 } else {
443 throttle.clearReadSize();
444 }
445 break;
446 default:
447 throw new RuntimeException("Invalid throttle type: " + req.getType());
448 }
449 quotas.setThrottle(throttle.build());
450 } else {
451 quotas.clearThrottle();
452 }
453 }
454
455 private void applyBypassGlobals(final Quotas.Builder quotas, boolean bypassGlobals) {
456 if (bypassGlobals) {
457 quotas.setBypassGlobals(bypassGlobals);
458 } else {
459 quotas.clearBypassGlobals();
460 }
461 }
462
463 private void validateTimedQuota(final TimedQuota timedQuota) throws IOException {
464 if (timedQuota.getSoftLimit() < 1) {
465 throw new DoNotRetryIOException(new UnsupportedOperationException(
466 "The throttle limit must be greater then 0, got " + timedQuota.getSoftLimit()));
467 }
468 }
469
470
471
472
473
474 private void checkQuotaSupport() throws IOException {
475 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
476 throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled"));
477 }
478 if (!initialized) {
479 long maxWaitTime = masterServices.getConfiguration().getLong(
480 "hbase.master.wait.for.quota.manager.init", 30000);
481 long startTime = EnvironmentEdgeManager.currentTime();
482 do {
483 try {
484 Thread.sleep(100);
485 } catch (InterruptedException e) {
486 LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
487 break;
488 }
489 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
490 if (!initialized) {
491 throw new IOException("Quota manager is uninitialized, please retry later.");
492 }
493 }
494 }
495
496 private void createQuotaTable() throws IOException {
497 HRegionInfo[] newRegions = new HRegionInfo[] { new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME) };
498
499 masterServices.getMasterProcedureExecutor()
500 .submitProcedure(new CreateTableProcedure(
501 masterServices.getMasterProcedureExecutor().getEnvironment(),
502 QuotaUtil.QUOTA_TABLE_DESC,
503 newRegions));
504 }
505
506 private static class NamedLock<T> {
507 private HashSet<T> locks = new HashSet<T>();
508
509 public void lock(final T name) throws InterruptedException {
510 synchronized (locks) {
511 while (locks.contains(name)) {
512 locks.wait();
513 }
514 locks.add(name);
515 }
516 }
517
518 public void unlock(final T name) {
519 synchronized (locks) {
520 locks.remove(name);
521 locks.notifyAll();
522 }
523 }
524 }
525
526 @Override
527 public void onRegionSplitReverted(HRegionInfo hri) throws IOException {
528 if (initialized) {
529 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
530 }
531 }
532 }