001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.commons.lang3.builder.HashCodeBuilder; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.NamespaceDescriptor; 036import org.apache.hadoop.hbase.RegionStateListener; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.master.MasterServices; 041import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 042import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure; 043import org.apache.hadoop.hbase.namespace.NamespaceAuditor; 044import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.apache.yetus.audience.InterfaceStability; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 053import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 054 055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; 066 067/** 068 * Master Quota Manager. 069 * It is responsible for initialize the quota table on the first-run and 070 * provide the admin operations to interact with the quota table. 071 * 072 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes 073 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER. 074 */ 075@InterfaceAudience.Private 076@InterfaceStability.Evolving 077public class MasterQuotaManager implements RegionStateListener { 078 private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class); 079 private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap( 080 new HashMap<>()); 081 082 private final MasterServices masterServices; 083 private NamedLock<String> namespaceLocks; 084 private NamedLock<TableName> tableLocks; 085 private NamedLock<String> userLocks; 086 private NamedLock<String> regionServerLocks; 087 private boolean initialized = false; 088 private NamespaceAuditor namespaceQuotaManager; 089 private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes; 090 // Storage for quota rpc throttle 091 private RpcThrottleStorage rpcThrottleStorage; 092 093 public MasterQuotaManager(final MasterServices masterServices) { 094 this.masterServices = masterServices; 095 } 096 097 public void start() throws IOException { 098 // If the user doesn't want the quota support skip all the initializations. 099 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 100 LOG.info("Quota support disabled"); 101 return; 102 } 103 104 // Create the quota table if missing 105 if (!masterServices.getTableDescriptors().exists(QuotaUtil.QUOTA_TABLE_NAME)) { 106 LOG.info("Quota table not found. Creating..."); 107 createQuotaTable(); 108 } 109 110 LOG.info("Initializing quota support"); 111 namespaceLocks = new NamedLock<>(); 112 tableLocks = new NamedLock<>(); 113 userLocks = new NamedLock<>(); 114 regionServerLocks = new NamedLock<>(); 115 regionSizes = new ConcurrentHashMap<>(); 116 117 namespaceQuotaManager = new NamespaceAuditor(masterServices); 118 namespaceQuotaManager.start(); 119 initialized = true; 120 121 rpcThrottleStorage = 122 new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration()); 123 } 124 125 public void stop() { 126 } 127 128 public boolean isQuotaInitialized() { 129 return initialized && namespaceQuotaManager.isInitialized(); 130 } 131 132 /* ========================================================================== 133 * Admin operations to manage the quota table 134 */ 135 public SetQuotaResponse setQuota(final SetQuotaRequest req) 136 throws IOException, InterruptedException { 137 checkQuotaSupport(); 138 139 if (req.hasUserName()) { 140 userLocks.lock(req.getUserName()); 141 try { 142 if (req.hasTableName()) { 143 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req); 144 } else if (req.hasNamespace()) { 145 setUserQuota(req.getUserName(), req.getNamespace(), req); 146 } else { 147 setUserQuota(req.getUserName(), req); 148 } 149 } finally { 150 userLocks.unlock(req.getUserName()); 151 } 152 } else if (req.hasTableName()) { 153 TableName table = ProtobufUtil.toTableName(req.getTableName()); 154 tableLocks.lock(table); 155 try { 156 setTableQuota(table, req); 157 } finally { 158 tableLocks.unlock(table); 159 } 160 } else if (req.hasNamespace()) { 161 namespaceLocks.lock(req.getNamespace()); 162 try { 163 setNamespaceQuota(req.getNamespace(), req); 164 } finally { 165 namespaceLocks.unlock(req.getNamespace()); 166 } 167 } else if (req.hasRegionServer()) { 168 regionServerLocks.lock(req.getRegionServer()); 169 try { 170 setRegionServerQuota(req.getRegionServer(), req); 171 } finally { 172 regionServerLocks.unlock(req.getRegionServer()); 173 } 174 } else { 175 throw new DoNotRetryIOException(new UnsupportedOperationException( 176 "a user, a table, a namespace or region server must be specified")); 177 } 178 return SetQuotaResponse.newBuilder().build(); 179 } 180 181 public void setUserQuota(final String userName, final SetQuotaRequest req) 182 throws IOException, InterruptedException { 183 setQuota(req, new SetQuotaOperations() { 184 @Override 185 public GlobalQuotaSettingsImpl fetch() throws IOException { 186 return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null, 187 QuotaUtil.getUserQuota(masterServices.getConnection(), userName)); 188 } 189 @Override 190 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 191 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas()); 192 } 193 @Override 194 public void delete() throws IOException { 195 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName); 196 } 197 @Override 198 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 199 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo); 200 } 201 @Override 202 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 203 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo); 204 } 205 }); 206 } 207 208 public void setUserQuota(final String userName, final TableName table, 209 final SetQuotaRequest req) throws IOException, InterruptedException { 210 setQuota(req, new SetQuotaOperations() { 211 @Override 212 public GlobalQuotaSettingsImpl fetch() throws IOException { 213 return new GlobalQuotaSettingsImpl(userName, table, null, null, 214 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table)); 215 } 216 @Override 217 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 218 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, 219 quotaPojo.toQuotas()); 220 } 221 @Override 222 public void delete() throws IOException { 223 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table); 224 } 225 @Override 226 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 227 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo); 228 } 229 @Override 230 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 231 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo); 232 } 233 }); 234 } 235 236 public void setUserQuota(final String userName, final String namespace, 237 final SetQuotaRequest req) throws IOException, InterruptedException { 238 setQuota(req, new SetQuotaOperations() { 239 @Override 240 public GlobalQuotaSettingsImpl fetch() throws IOException { 241 return new GlobalQuotaSettingsImpl(userName, null, namespace, null, 242 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace)); 243 } 244 @Override 245 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 246 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, 247 quotaPojo.toQuotas()); 248 } 249 @Override 250 public void delete() throws IOException { 251 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace); 252 } 253 @Override 254 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 255 masterServices.getMasterCoprocessorHost().preSetUserQuota( 256 userName, namespace, quotaPojo); 257 } 258 @Override 259 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 260 masterServices.getMasterCoprocessorHost().postSetUserQuota( 261 userName, namespace, quotaPojo); 262 } 263 }); 264 } 265 266 public void setTableQuota(final TableName table, final SetQuotaRequest req) 267 throws IOException, InterruptedException { 268 setQuota(req, new SetQuotaOperations() { 269 @Override 270 public GlobalQuotaSettingsImpl fetch() throws IOException { 271 return new GlobalQuotaSettingsImpl(null, table, null, null, 272 QuotaUtil.getTableQuota(masterServices.getConnection(), table)); 273 } 274 @Override 275 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 276 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas()); 277 } 278 @Override 279 public void delete() throws IOException { 280 SpaceQuotaSnapshot currSnapshotOfTable = 281 QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table); 282 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table); 283 if (currSnapshotOfTable != null) { 284 SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus(); 285 if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null) 286 && quotaStatus.isInViolation()) { 287 QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table); 288 } 289 } 290 } 291 @Override 292 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 293 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo); 294 } 295 @Override 296 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 297 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo); 298 } 299 }); 300 } 301 302 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req) 303 throws IOException, InterruptedException { 304 setQuota(req, new SetQuotaOperations() { 305 @Override 306 public GlobalQuotaSettingsImpl fetch() throws IOException { 307 return new GlobalQuotaSettingsImpl(null, null, namespace, null, 308 QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace)); 309 } 310 @Override 311 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 312 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, 313 quotaPojo.toQuotas()); 314 } 315 @Override 316 public void delete() throws IOException { 317 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace); 318 } 319 @Override 320 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 321 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo); 322 } 323 @Override 324 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 325 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo); 326 } 327 }); 328 } 329 330 public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req) 331 throws IOException, InterruptedException { 332 setQuota(req, new SetQuotaOperations() { 333 @Override 334 public GlobalQuotaSettingsImpl fetch() throws IOException { 335 return new GlobalQuotaSettingsImpl(null, null, null, regionServer, 336 QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer)); 337 } 338 339 @Override 340 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 341 QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer, 342 quotaPojo.toQuotas()); 343 } 344 345 @Override 346 public void delete() throws IOException { 347 QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer); 348 } 349 350 @Override 351 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 352 masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo); 353 } 354 355 @Override 356 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 357 masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo); 358 } 359 }); 360 } 361 362 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException { 363 if (initialized) { 364 this.namespaceQuotaManager.addNamespace(desc); 365 } 366 } 367 368 public void removeNamespaceQuota(String namespace) throws IOException { 369 if (initialized) { 370 this.namespaceQuotaManager.deleteNamespace(namespace); 371 } 372 } 373 374 public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request) 375 throws IOException { 376 boolean rpcThrottle = request.getRpcThrottleEnabled(); 377 if (initialized) { 378 masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle); 379 boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled(); 380 if (rpcThrottle != oldRpcThrottle) { 381 LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(), 382 oldRpcThrottle, rpcThrottle); 383 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 384 SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage, 385 rpcThrottle, masterServices.getServerName(), latch); 386 masterServices.getMasterProcedureExecutor().submitProcedure(procedure); 387 latch.await(); 388 } else { 389 LOG.warn("Skip switch rpc throttle to {} because it's the same with old value", 390 rpcThrottle); 391 } 392 SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder() 393 .setPreviousRpcThrottleEnabled(oldRpcThrottle).build(); 394 masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle); 395 return response; 396 } else { 397 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle); 398 return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build(); 399 } 400 } 401 402 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request) 403 throws IOException { 404 if (initialized) { 405 masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled(); 406 boolean enabled = isRpcThrottleEnabled(); 407 IsRpcThrottleEnabledResponse response = 408 IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build(); 409 masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled); 410 return response; 411 } else { 412 LOG.warn("Skip get rpc throttle because rpc quota is disabled"); 413 return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build(); 414 } 415 } 416 417 public boolean isRpcThrottleEnabled() throws IOException { 418 return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false; 419 } 420 421 public SwitchExceedThrottleQuotaResponse 422 switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException { 423 boolean enabled = request.getExceedThrottleQuotaEnabled(); 424 if (initialized) { 425 masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled); 426 boolean previousEnabled = 427 QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()); 428 if (previousEnabled == enabled) { 429 LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value", 430 enabled); 431 } else { 432 QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled); 433 LOG.info("{} switch exceed throttle quota from {} to {}", 434 masterServices.getClientIdAuditPrefix(), previousEnabled, enabled); 435 } 436 SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder() 437 .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build(); 438 masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled, 439 enabled); 440 return response; 441 } else { 442 LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled); 443 return SwitchExceedThrottleQuotaResponse.newBuilder() 444 .setPreviousExceedThrottleQuotaEnabled(false).build(); 445 } 446 } 447 448 public boolean isExceedThrottleQuotaEnabled() throws IOException { 449 return initialized ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()) 450 : false; 451 } 452 453 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) 454 throws IOException, InterruptedException { 455 if (req.hasRemoveAll() && req.getRemoveAll() == true) { 456 quotaOps.preApply(null); 457 quotaOps.delete(); 458 quotaOps.postApply(null); 459 return; 460 } 461 462 // Apply quota changes 463 GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch(); 464 if (LOG.isTraceEnabled()) { 465 LOG.trace( 466 "Current quota for request(" + TextFormat.shortDebugString(req) 467 + "): " + currentQuota); 468 } 469 // Call the appropriate "pre" CP hook with the current quota value (may be null) 470 quotaOps.preApply(currentQuota); 471 // Translate the protobuf request back into a POJO 472 QuotaSettings newQuota = QuotaSettings.buildFromProto(req); 473 if (LOG.isTraceEnabled()) { 474 LOG.trace("Deserialized quota from request: " + newQuota); 475 } 476 477 // Merge the current quota settings with the new quota settings the user provided. 478 // 479 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one 480 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type. 481 GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota); 482 if (LOG.isTraceEnabled()) { 483 LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota); 484 } 485 486 // Submit new changes 487 if (mergedQuota == null) { 488 quotaOps.delete(); 489 } else { 490 quotaOps.update(mergedQuota); 491 } 492 // Advertise the final result via the "post" CP hook 493 quotaOps.postApply(mergedQuota); 494 } 495 496 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException { 497 if (initialized) { 498 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions); 499 } 500 } 501 502 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException { 503 if (initialized) { 504 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions); 505 } 506 } 507 508 /** 509 * @return cached region count, or -1 if quota manager is disabled or table status not found 510 */ 511 public int getRegionCountOfTable(TableName tName) throws IOException { 512 if (initialized) { 513 return namespaceQuotaManager.getRegionCountOfTable(tName); 514 } 515 return -1; 516 } 517 518 @Override 519 public void onRegionMerged(RegionInfo mergedRegion) throws IOException { 520 if (initialized) { 521 namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion); 522 } 523 } 524 525 @Override 526 public void onRegionSplit(RegionInfo hri) throws IOException { 527 if (initialized) { 528 namespaceQuotaManager.checkQuotaToSplitRegion(hri); 529 } 530 } 531 532 /** 533 * Remove table from namespace quota. 534 * 535 * @param tName - The table name to update quota usage. 536 * @throws IOException Signals that an I/O exception has occurred. 537 */ 538 public void removeTableFromNamespaceQuota(TableName tName) throws IOException { 539 if (initialized) { 540 namespaceQuotaManager.removeFromNamespaceUsage(tName); 541 } 542 } 543 544 public NamespaceAuditor getNamespaceQuotaManager() { 545 return this.namespaceQuotaManager; 546 } 547 548 /** 549 * Encapsulates CRUD quota operations for some subject. 550 */ 551 private static interface SetQuotaOperations { 552 /** 553 * Fetches the current quota settings for the subject. 554 */ 555 GlobalQuotaSettingsImpl fetch() throws IOException; 556 /** 557 * Deletes the quota for the subject. 558 */ 559 void delete() throws IOException; 560 /** 561 * Persist the given quota for the subject. 562 */ 563 void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 564 /** 565 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current 566 * quota for the subject. 567 */ 568 void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 569 /** 570 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting 571 * quota from the request action for the subject. 572 */ 573 void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 574 } 575 576 /* ========================================================================== 577 * Helpers 578 */ 579 580 private void checkQuotaSupport() throws IOException { 581 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 582 throw new DoNotRetryIOException( 583 new UnsupportedOperationException("quota support disabled")); 584 } 585 if (!initialized) { 586 long maxWaitTime = masterServices.getConfiguration().getLong( 587 "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds. 588 long startTime = EnvironmentEdgeManager.currentTime(); 589 do { 590 try { 591 Thread.sleep(100); 592 } catch (InterruptedException e) { 593 LOG.warn("Interrupted while waiting for Quota Manager to be initialized."); 594 break; 595 } 596 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime); 597 if (!initialized) { 598 throw new IOException("Quota manager is uninitialized, please retry later."); 599 } 600 } 601 } 602 603 private void createQuotaTable() throws IOException { 604 masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC); 605 } 606 607 private static class NamedLock<T> { 608 private final HashSet<T> locks = new HashSet<>(); 609 610 public void lock(final T name) throws InterruptedException { 611 synchronized (locks) { 612 while (locks.contains(name)) { 613 locks.wait(); 614 } 615 locks.add(name); 616 } 617 } 618 619 public void unlock(final T name) { 620 synchronized (locks) { 621 locks.remove(name); 622 locks.notifyAll(); 623 } 624 } 625 } 626 627 @Override 628 public void onRegionSplitReverted(RegionInfo hri) throws IOException { 629 if (initialized) { 630 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri); 631 } 632 } 633 634 /** 635 * Holds the size of a region at the given time, millis since the epoch. 636 */ 637 private static class SizeSnapshotWithTimestamp { 638 private final long size; 639 private final long time; 640 641 public SizeSnapshotWithTimestamp(long size, long time) { 642 this.size = size; 643 this.time = time; 644 } 645 646 public long getSize() { 647 return size; 648 } 649 650 public long getTime() { 651 return time; 652 } 653 654 @Override 655 public boolean equals(Object o) { 656 if (o instanceof SizeSnapshotWithTimestamp) { 657 SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o; 658 return size == other.size && time == other.time; 659 } 660 return false; 661 } 662 663 @Override 664 public int hashCode() { 665 HashCodeBuilder hcb = new HashCodeBuilder(); 666 return hcb.append(size).append(time).toHashCode(); 667 } 668 669 @Override 670 public String toString() { 671 StringBuilder sb = new StringBuilder(32); 672 sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, "); 673 sb.append("time=").append(time).append("}"); 674 return sb.toString(); 675 } 676 } 677 678 void initializeRegionSizes() { 679 assert regionSizes == null; 680 this.regionSizes = new ConcurrentHashMap<>(); 681 } 682 683 public void addRegionSize(RegionInfo hri, long size, long time) { 684 if (regionSizes == null) { 685 return; 686 } 687 regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time)); 688 } 689 690 public Map<RegionInfo, Long> snapshotRegionSizes() { 691 if (regionSizes == null) { 692 return EMPTY_MAP; 693 } 694 695 Map<RegionInfo, Long> copy = new HashMap<>(); 696 for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) { 697 copy.put(entry.getKey(), entry.getValue().getSize()); 698 } 699 return copy; 700 } 701 702 int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) { 703 if (regionSizes == null) { 704 return 0; 705 } 706 int numEntriesRemoved = 0; 707 Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator = 708 regionSizes.entrySet().iterator(); 709 while (iterator.hasNext()) { 710 RegionInfo regionInfo = iterator.next().getKey(); 711 long currentEntryTime = regionSizes.get(regionInfo).getTime(); 712 // do not prune the entries if table is in violation and 713 // violation policy is disable to avoid cycle of enable/disable. 714 // Please refer HBASE-22012 for more details. 715 // prune entries older than time. 716 if (currentEntryTime < timeToPruneBefore && !isInViolationAndPolicyDisable( 717 regionInfo.getTable(), quotaObserverChore)) { 718 iterator.remove(); 719 numEntriesRemoved++; 720 } 721 } 722 return numEntriesRemoved; 723 } 724 725 /** 726 * Method to check if a table is in violation and policy set on table is DISABLE. 727 * 728 * @param tableName tableName to check. 729 * @param quotaObserverChore QuotaObserverChore instance 730 * @return returns true if table is in violation and policy is disable else false. 731 */ 732 private boolean isInViolationAndPolicyDisable(TableName tableName, 733 QuotaObserverChore quotaObserverChore) { 734 boolean isInViolationAtTable = false; 735 boolean isInViolationAtNamespace = false; 736 SpaceViolationPolicy tablePolicy = null; 737 SpaceViolationPolicy namespacePolicy = null; 738 // Get Current Snapshot for the given table 739 SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName); 740 SpaceQuotaSnapshot namespaceQuotaSnapshot = 741 quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString()); 742 if (tableQuotaSnapshot != null) { 743 // check if table in violation 744 isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation(); 745 Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy(); 746 if (policy.isPresent()) { 747 tablePolicy = policy.get(); 748 } 749 } 750 if (namespaceQuotaSnapshot != null) { 751 // check namespace in violation 752 isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation(); 753 Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy(); 754 if (policy.isPresent()) { 755 namespacePolicy = policy.get(); 756 } 757 } 758 return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable) || ( 759 namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace); 760 } 761 762 /** 763 * Removes each region size entry where the RegionInfo references the provided TableName. 764 * 765 * @param tableName tableName. 766 */ 767 public void removeRegionSizesForTable(TableName tableName) { 768 regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName)); 769 } 770 771 public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn, 772 Configuration conf, FileSystem fs) throws IOException { 773 final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = HashMultimap.create(); 774 // Group the archived files by table 775 for (FileWithSize fileWithSize : request.getArchivedFilesList()) { 776 TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName()); 777 archivedFilesByTable.put( 778 tn, Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize())); 779 } 780 if (LOG.isTraceEnabled()) { 781 LOG.trace("Grouped archived files by table: " + archivedFilesByTable); 782 } 783 // Report each set of files to the appropriate object 784 for (TableName tn : archivedFilesByTable.keySet()) { 785 final Set<Entry<String,Long>> filesWithSize = archivedFilesByTable.get(tn); 786 final FileArchiverNotifier notifier = FileArchiverNotifierFactoryImpl.getInstance().get( 787 conn, conf, fs, tn); 788 notifier.addArchivedFiles(filesWithSize); 789 } 790 } 791} 792