001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.Optional; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import org.apache.commons.lang3.builder.HashCodeBuilder; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.NamespaceDescriptor; 035import org.apache.hadoop.hbase.RegionStateListener; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.master.MasterServices; 040import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 041import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure; 042import org.apache.hadoop.hbase.namespace.NamespaceAuditor; 043import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.apache.yetus.audience.InterfaceStability; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 051import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 052import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; 065 066/** 067 * Master Quota Manager. It is responsible for initialize the quota table on the first-run and 068 * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be 069 * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the 070 * QuotaScope is CLUSTER. 071 */ 072@InterfaceAudience.Private 073@InterfaceStability.Evolving 074public class MasterQuotaManager implements RegionStateListener { 075 private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class); 076 private static final Map<RegionInfo, Long> EMPTY_MAP = 077 Collections.unmodifiableMap(new HashMap<>()); 078 079 private final MasterServices masterServices; 080 private NamedLock<String> namespaceLocks; 081 private NamedLock<TableName> tableLocks; 082 private NamedLock<String> userLocks; 083 private NamedLock<String> regionServerLocks; 084 private boolean initialized = false; 085 private NamespaceAuditor namespaceQuotaManager; 086 private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes; 087 // Storage for quota rpc throttle 088 private RpcThrottleStorage rpcThrottleStorage; 089 090 public MasterQuotaManager(final MasterServices masterServices) { 091 this.masterServices = masterServices; 092 } 093 094 public void start() throws IOException { 095 // If the user doesn't want the quota support skip all the initializations. 096 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 097 LOG.info("Quota support disabled"); 098 return; 099 } 100 101 // Create the quota table if missing 102 if (!masterServices.getTableDescriptors().exists(QuotaUtil.QUOTA_TABLE_NAME)) { 103 LOG.info("Quota table not found. Creating..."); 104 createQuotaTable(); 105 } 106 107 LOG.info("Initializing quota support"); 108 namespaceLocks = new NamedLock<>(); 109 tableLocks = new NamedLock<>(); 110 userLocks = new NamedLock<>(); 111 regionServerLocks = new NamedLock<>(); 112 regionSizes = new ConcurrentHashMap<>(); 113 114 namespaceQuotaManager = new NamespaceAuditor(masterServices); 115 namespaceQuotaManager.start(); 116 initialized = true; 117 118 rpcThrottleStorage = 119 new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration()); 120 } 121 122 public void stop() { 123 } 124 125 public boolean isQuotaInitialized() { 126 return initialized && namespaceQuotaManager.isInitialized(); 127 } 128 129 /* 130 * ========================================================================== Admin operations to 131 * manage the quota table 132 */ 133 public SetQuotaResponse setQuota(final SetQuotaRequest req) 134 throws IOException, InterruptedException { 135 checkQuotaSupport(); 136 137 if (req.hasUserName()) { 138 userLocks.lock(req.getUserName()); 139 try { 140 if (req.hasTableName()) { 141 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req); 142 } else if (req.hasNamespace()) { 143 setUserQuota(req.getUserName(), req.getNamespace(), req); 144 } else { 145 setUserQuota(req.getUserName(), req); 146 } 147 } finally { 148 userLocks.unlock(req.getUserName()); 149 } 150 } else if (req.hasTableName()) { 151 TableName table = ProtobufUtil.toTableName(req.getTableName()); 152 tableLocks.lock(table); 153 try { 154 setTableQuota(table, req); 155 } finally { 156 tableLocks.unlock(table); 157 } 158 } else if (req.hasNamespace()) { 159 namespaceLocks.lock(req.getNamespace()); 160 try { 161 setNamespaceQuota(req.getNamespace(), req); 162 } finally { 163 namespaceLocks.unlock(req.getNamespace()); 164 } 165 } else if (req.hasRegionServer()) { 166 regionServerLocks.lock(req.getRegionServer()); 167 try { 168 setRegionServerQuota(req.getRegionServer(), req); 169 } finally { 170 regionServerLocks.unlock(req.getRegionServer()); 171 } 172 } else { 173 throw new DoNotRetryIOException(new UnsupportedOperationException( 174 "a user, a table, a namespace or region server must be specified")); 175 } 176 return SetQuotaResponse.newBuilder().build(); 177 } 178 179 public void setUserQuota(final String userName, final SetQuotaRequest req) 180 throws IOException, InterruptedException { 181 setQuota(req, new SetQuotaOperations() { 182 @Override 183 public GlobalQuotaSettingsImpl fetch() throws IOException { 184 return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null, 185 QuotaUtil.getUserQuota(masterServices.getConnection(), userName)); 186 } 187 188 @Override 189 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 190 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas()); 191 } 192 193 @Override 194 public void delete() throws IOException { 195 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName); 196 } 197 198 @Override 199 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 200 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo); 201 } 202 203 @Override 204 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 205 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo); 206 } 207 }); 208 } 209 210 public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req) 211 throws IOException, InterruptedException { 212 setQuota(req, new SetQuotaOperations() { 213 @Override 214 public GlobalQuotaSettingsImpl fetch() throws IOException { 215 return new GlobalQuotaSettingsImpl(userName, table, null, null, 216 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table)); 217 } 218 219 @Override 220 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 221 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, 222 quotaPojo.toQuotas()); 223 } 224 225 @Override 226 public void delete() throws IOException { 227 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table); 228 } 229 230 @Override 231 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 232 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo); 233 } 234 235 @Override 236 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 237 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo); 238 } 239 }); 240 } 241 242 public void setUserQuota(final String userName, final String namespace, final SetQuotaRequest req) 243 throws IOException, InterruptedException { 244 setQuota(req, new SetQuotaOperations() { 245 @Override 246 public GlobalQuotaSettingsImpl fetch() throws IOException { 247 return new GlobalQuotaSettingsImpl(userName, null, namespace, null, 248 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace)); 249 } 250 251 @Override 252 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 253 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, 254 quotaPojo.toQuotas()); 255 } 256 257 @Override 258 public void delete() throws IOException { 259 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace); 260 } 261 262 @Override 263 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 264 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotaPojo); 265 } 266 267 @Override 268 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 269 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotaPojo); 270 } 271 }); 272 } 273 274 public void setTableQuota(final TableName table, final SetQuotaRequest req) 275 throws IOException, InterruptedException { 276 setQuota(req, new SetQuotaOperations() { 277 @Override 278 public GlobalQuotaSettingsImpl fetch() throws IOException { 279 return new GlobalQuotaSettingsImpl(null, table, null, null, 280 QuotaUtil.getTableQuota(masterServices.getConnection(), table)); 281 } 282 283 @Override 284 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 285 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas()); 286 } 287 288 @Override 289 public void delete() throws IOException { 290 SpaceQuotaSnapshot currSnapshotOfTable = 291 QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table); 292 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table); 293 if (currSnapshotOfTable != null) { 294 SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus(); 295 if ( 296 SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null) 297 && quotaStatus.isInViolation() 298 ) { 299 QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table); 300 } 301 } 302 } 303 304 @Override 305 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 306 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo); 307 } 308 309 @Override 310 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 311 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo); 312 } 313 }); 314 } 315 316 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req) 317 throws IOException, InterruptedException { 318 setQuota(req, new SetQuotaOperations() { 319 @Override 320 public GlobalQuotaSettingsImpl fetch() throws IOException { 321 return new GlobalQuotaSettingsImpl(null, null, namespace, null, 322 QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace)); 323 } 324 325 @Override 326 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 327 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, 328 quotaPojo.toQuotas()); 329 } 330 331 @Override 332 public void delete() throws IOException { 333 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace); 334 } 335 336 @Override 337 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 338 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo); 339 } 340 341 @Override 342 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 343 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo); 344 } 345 }); 346 } 347 348 public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req) 349 throws IOException, InterruptedException { 350 setQuota(req, new SetQuotaOperations() { 351 @Override 352 public GlobalQuotaSettingsImpl fetch() throws IOException { 353 return new GlobalQuotaSettingsImpl(null, null, null, regionServer, 354 QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer)); 355 } 356 357 @Override 358 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 359 QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer, 360 quotaPojo.toQuotas()); 361 } 362 363 @Override 364 public void delete() throws IOException { 365 QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer); 366 } 367 368 @Override 369 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 370 masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo); 371 } 372 373 @Override 374 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 375 masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo); 376 } 377 }); 378 } 379 380 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException { 381 if (initialized) { 382 this.namespaceQuotaManager.addNamespace(desc); 383 } 384 } 385 386 public void removeNamespaceQuota(String namespace) throws IOException { 387 if (initialized) { 388 this.namespaceQuotaManager.deleteNamespace(namespace); 389 } 390 } 391 392 public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request) 393 throws IOException { 394 boolean rpcThrottle = request.getRpcThrottleEnabled(); 395 if (initialized) { 396 masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle); 397 boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled(); 398 if (rpcThrottle != oldRpcThrottle) { 399 LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(), 400 oldRpcThrottle, rpcThrottle); 401 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 402 SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage, 403 rpcThrottle, masterServices.getServerName(), latch); 404 masterServices.getMasterProcedureExecutor().submitProcedure(procedure); 405 latch.await(); 406 } else { 407 LOG.warn("Skip switch rpc throttle to {} because it's the same with old value", 408 rpcThrottle); 409 } 410 SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder() 411 .setPreviousRpcThrottleEnabled(oldRpcThrottle).build(); 412 masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle); 413 return response; 414 } else { 415 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle); 416 return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build(); 417 } 418 } 419 420 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request) 421 throws IOException { 422 if (initialized) { 423 masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled(); 424 boolean enabled = isRpcThrottleEnabled(); 425 IsRpcThrottleEnabledResponse response = 426 IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build(); 427 masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled); 428 return response; 429 } else { 430 LOG.warn("Skip get rpc throttle because rpc quota is disabled"); 431 return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build(); 432 } 433 } 434 435 public boolean isRpcThrottleEnabled() throws IOException { 436 return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false; 437 } 438 439 public SwitchExceedThrottleQuotaResponse 440 switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException { 441 boolean enabled = request.getExceedThrottleQuotaEnabled(); 442 if (initialized) { 443 masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled); 444 boolean previousEnabled = 445 QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()); 446 if (previousEnabled == enabled) { 447 LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value", 448 enabled); 449 } else { 450 QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled); 451 LOG.info("{} switch exceed throttle quota from {} to {}", 452 masterServices.getClientIdAuditPrefix(), previousEnabled, enabled); 453 } 454 SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder() 455 .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build(); 456 masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled, 457 enabled); 458 return response; 459 } else { 460 LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled); 461 return SwitchExceedThrottleQuotaResponse.newBuilder() 462 .setPreviousExceedThrottleQuotaEnabled(false).build(); 463 } 464 } 465 466 public boolean isExceedThrottleQuotaEnabled() throws IOException { 467 return initialized 468 ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()) 469 : false; 470 } 471 472 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) 473 throws IOException, InterruptedException { 474 if (req.hasRemoveAll() && req.getRemoveAll() == true) { 475 quotaOps.preApply(null); 476 quotaOps.delete(); 477 quotaOps.postApply(null); 478 return; 479 } 480 481 // Apply quota changes 482 GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch(); 483 if (LOG.isTraceEnabled()) { 484 LOG.trace( 485 "Current quota for request(" + TextFormat.shortDebugString(req) + "): " + currentQuota); 486 } 487 // Call the appropriate "pre" CP hook with the current quota value (may be null) 488 quotaOps.preApply(currentQuota); 489 // Translate the protobuf request back into a POJO 490 QuotaSettings newQuota = QuotaSettings.buildFromProto(req); 491 if (LOG.isTraceEnabled()) { 492 LOG.trace("Deserialized quota from request: " + newQuota); 493 } 494 495 // Merge the current quota settings with the new quota settings the user provided. 496 // 497 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one 498 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type. 499 GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota); 500 if (LOG.isTraceEnabled()) { 501 LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota); 502 } 503 504 // Submit new changes 505 if (mergedQuota == null) { 506 quotaOps.delete(); 507 } else { 508 quotaOps.update(mergedQuota); 509 } 510 // Advertise the final result via the "post" CP hook 511 quotaOps.postApply(mergedQuota); 512 } 513 514 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException { 515 if (initialized) { 516 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions); 517 } 518 } 519 520 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException { 521 if (initialized) { 522 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions); 523 } 524 } 525 526 /** 527 * @return cached region count, or -1 if quota manager is disabled or table status not found 528 */ 529 public int getRegionCountOfTable(TableName tName) throws IOException { 530 if (initialized) { 531 return namespaceQuotaManager.getRegionCountOfTable(tName); 532 } 533 return -1; 534 } 535 536 @Override 537 public void onRegionMerged(RegionInfo mergedRegion) throws IOException { 538 if (initialized) { 539 namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion); 540 } 541 } 542 543 @Override 544 public void onRegionSplit(RegionInfo hri) throws IOException { 545 if (initialized) { 546 namespaceQuotaManager.checkQuotaToSplitRegion(hri); 547 } 548 } 549 550 /** 551 * Remove table from namespace quota. 552 * @param tName - The table name to update quota usage. 553 * @throws IOException Signals that an I/O exception has occurred. 554 */ 555 public void removeTableFromNamespaceQuota(TableName tName) throws IOException { 556 if (initialized) { 557 namespaceQuotaManager.removeFromNamespaceUsage(tName); 558 } 559 } 560 561 public NamespaceAuditor getNamespaceQuotaManager() { 562 return this.namespaceQuotaManager; 563 } 564 565 /** 566 * Encapsulates CRUD quota operations for some subject. 567 */ 568 private static interface SetQuotaOperations { 569 /** 570 * Fetches the current quota settings for the subject. 571 */ 572 GlobalQuotaSettingsImpl fetch() throws IOException; 573 574 /** 575 * Deletes the quota for the subject. 576 */ 577 void delete() throws IOException; 578 579 /** 580 * Persist the given quota for the subject. 581 */ 582 void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 583 584 /** 585 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current quota 586 * for the subject. 587 */ 588 void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 589 590 /** 591 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting quota 592 * from the request action for the subject. 593 */ 594 void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 595 } 596 597 /* 598 * ========================================================================== Helpers 599 */ 600 601 private void checkQuotaSupport() throws IOException { 602 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 603 throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled")); 604 } 605 if (!initialized) { 606 long maxWaitTime = masterServices.getConfiguration() 607 .getLong("hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds. 608 long startTime = EnvironmentEdgeManager.currentTime(); 609 do { 610 try { 611 Thread.sleep(100); 612 } catch (InterruptedException e) { 613 LOG.warn("Interrupted while waiting for Quota Manager to be initialized."); 614 break; 615 } 616 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime); 617 if (!initialized) { 618 throw new IOException("Quota manager is uninitialized, please retry later."); 619 } 620 } 621 } 622 623 private void createQuotaTable() throws IOException { 624 masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC); 625 } 626 627 private static class NamedLock<T> { 628 private final HashSet<T> locks = new HashSet<>(); 629 630 public void lock(final T name) throws InterruptedException { 631 synchronized (locks) { 632 while (locks.contains(name)) { 633 locks.wait(); 634 } 635 locks.add(name); 636 } 637 } 638 639 public void unlock(final T name) { 640 synchronized (locks) { 641 locks.remove(name); 642 locks.notifyAll(); 643 } 644 } 645 } 646 647 @Override 648 public void onRegionSplitReverted(RegionInfo hri) throws IOException { 649 if (initialized) { 650 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri); 651 } 652 } 653 654 /** 655 * Holds the size of a region at the given time, millis since the epoch. 656 */ 657 private static class SizeSnapshotWithTimestamp { 658 private final long size; 659 private final long time; 660 661 public SizeSnapshotWithTimestamp(long size, long time) { 662 this.size = size; 663 this.time = time; 664 } 665 666 public long getSize() { 667 return size; 668 } 669 670 public long getTime() { 671 return time; 672 } 673 674 @Override 675 public boolean equals(Object o) { 676 if (o instanceof SizeSnapshotWithTimestamp) { 677 SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o; 678 return size == other.size && time == other.time; 679 } 680 return false; 681 } 682 683 @Override 684 public int hashCode() { 685 HashCodeBuilder hcb = new HashCodeBuilder(); 686 return hcb.append(size).append(time).toHashCode(); 687 } 688 689 @Override 690 public String toString() { 691 StringBuilder sb = new StringBuilder(32); 692 sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, "); 693 sb.append("time=").append(time).append("}"); 694 return sb.toString(); 695 } 696 } 697 698 void initializeRegionSizes() { 699 assert regionSizes == null; 700 this.regionSizes = new ConcurrentHashMap<>(); 701 } 702 703 public void addRegionSize(RegionInfo hri, long size, long time) { 704 if (regionSizes == null) { 705 return; 706 } 707 regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time)); 708 } 709 710 public Map<RegionInfo, Long> snapshotRegionSizes() { 711 if (regionSizes == null) { 712 return EMPTY_MAP; 713 } 714 715 Map<RegionInfo, Long> copy = new HashMap<>(); 716 for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) { 717 copy.put(entry.getKey(), entry.getValue().getSize()); 718 } 719 return copy; 720 } 721 722 int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) { 723 if (regionSizes == null) { 724 return 0; 725 } 726 int numEntriesRemoved = 0; 727 Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator = 728 regionSizes.entrySet().iterator(); 729 while (iterator.hasNext()) { 730 RegionInfo regionInfo = iterator.next().getKey(); 731 long currentEntryTime = regionSizes.get(regionInfo).getTime(); 732 // do not prune the entries if table is in violation and 733 // violation policy is disable to avoid cycle of enable/disable. 734 // Please refer HBASE-22012 for more details. 735 // prune entries older than time. 736 if ( 737 currentEntryTime < timeToPruneBefore 738 && !isInViolationAndPolicyDisable(regionInfo.getTable(), quotaObserverChore) 739 ) { 740 iterator.remove(); 741 numEntriesRemoved++; 742 } 743 } 744 return numEntriesRemoved; 745 } 746 747 /** 748 * Method to check if a table is in violation and policy set on table is DISABLE. 749 * @param tableName tableName to check. 750 * @param quotaObserverChore QuotaObserverChore instance 751 * @return returns true if table is in violation and policy is disable else false. 752 */ 753 private boolean isInViolationAndPolicyDisable(TableName tableName, 754 QuotaObserverChore quotaObserverChore) { 755 boolean isInViolationAtTable = false; 756 boolean isInViolationAtNamespace = false; 757 SpaceViolationPolicy tablePolicy = null; 758 SpaceViolationPolicy namespacePolicy = null; 759 // Get Current Snapshot for the given table 760 SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName); 761 SpaceQuotaSnapshot namespaceQuotaSnapshot = 762 quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString()); 763 if (tableQuotaSnapshot != null) { 764 // check if table in violation 765 isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation(); 766 Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy(); 767 if (policy.isPresent()) { 768 tablePolicy = policy.get(); 769 } 770 } 771 if (namespaceQuotaSnapshot != null) { 772 // check namespace in violation 773 isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation(); 774 Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy(); 775 if (policy.isPresent()) { 776 namespacePolicy = policy.get(); 777 } 778 } 779 return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable) 780 || (namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace); 781 } 782 783 public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn, 784 Configuration conf, FileSystem fs) throws IOException { 785 final HashMultimap<TableName, Entry<String, Long>> archivedFilesByTable = HashMultimap.create(); 786 // Group the archived files by table 787 for (FileWithSize fileWithSize : request.getArchivedFilesList()) { 788 TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName()); 789 archivedFilesByTable.put(tn, 790 Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize())); 791 } 792 if (LOG.isTraceEnabled()) { 793 LOG.trace("Grouped archived files by table: " + archivedFilesByTable); 794 } 795 // Report each set of files to the appropriate object 796 for (TableName tn : archivedFilesByTable.keySet()) { 797 final Set<Entry<String, Long>> filesWithSize = archivedFilesByTable.get(tn); 798 final FileArchiverNotifier notifier = 799 FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn); 800 notifier.addArchivedFiles(filesWithSize); 801 } 802 } 803 804 /** 805 * Removes each region size entry where the RegionInfo references the provided TableName. 806 * @param tableName tableName. 807 */ 808 public void removeRegionSizesForTable(TableName tableName) { 809 regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName)); 810 } 811}