001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031 032import org.apache.commons.lang3.builder.HashCodeBuilder; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.MetaTableAccessor; 037import org.apache.hadoop.hbase.NamespaceDescriptor; 038import org.apache.hadoop.hbase.RegionStateListener; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.master.MasterServices; 043import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 044import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure; 045import org.apache.hadoop.hbase.namespace.NamespaceAuditor; 046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 054import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 055import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 056import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 057 058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; 069 070/** 071 * Master Quota Manager. 072 * It is responsible for initialize the quota table on the first-run and 073 * provide the admin operations to interact with the quota table. 074 * 075 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes 076 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER. 077 */ 078@InterfaceAudience.Private 079@InterfaceStability.Evolving 080public class MasterQuotaManager implements RegionStateListener { 081 private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class); 082 private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap( 083 new HashMap<>()); 084 085 private final MasterServices masterServices; 086 private NamedLock<String> namespaceLocks; 087 private NamedLock<TableName> tableLocks; 088 private NamedLock<String> userLocks; 089 private NamedLock<String> regionServerLocks; 090 private boolean initialized = false; 091 private NamespaceAuditor namespaceQuotaManager; 092 private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes; 093 // Storage for quota rpc throttle 094 private RpcThrottleStorage rpcThrottleStorage; 095 096 public MasterQuotaManager(final MasterServices masterServices) { 097 this.masterServices = masterServices; 098 } 099 100 public void start() throws IOException { 101 // If the user doesn't want the quota support skip all the initializations. 102 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 103 LOG.info("Quota support disabled"); 104 return; 105 } 106 107 // Create the quota table if missing 108 if (!MetaTableAccessor.tableExists(masterServices.getConnection(), 109 QuotaUtil.QUOTA_TABLE_NAME)) { 110 LOG.info("Quota table not found. Creating..."); 111 createQuotaTable(); 112 } 113 114 LOG.info("Initializing quota support"); 115 namespaceLocks = new NamedLock<>(); 116 tableLocks = new NamedLock<>(); 117 userLocks = new NamedLock<>(); 118 regionServerLocks = new NamedLock<>(); 119 regionSizes = new ConcurrentHashMap<>(); 120 121 namespaceQuotaManager = new NamespaceAuditor(masterServices); 122 namespaceQuotaManager.start(); 123 initialized = true; 124 125 rpcThrottleStorage = 126 new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration()); 127 } 128 129 public void stop() { 130 } 131 132 public boolean isQuotaInitialized() { 133 return initialized && namespaceQuotaManager.isInitialized(); 134 } 135 136 /* ========================================================================== 137 * Admin operations to manage the quota table 138 */ 139 public SetQuotaResponse setQuota(final SetQuotaRequest req) 140 throws IOException, InterruptedException { 141 checkQuotaSupport(); 142 143 if (req.hasUserName()) { 144 userLocks.lock(req.getUserName()); 145 try { 146 if (req.hasTableName()) { 147 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req); 148 } else if (req.hasNamespace()) { 149 setUserQuota(req.getUserName(), req.getNamespace(), req); 150 } else { 151 setUserQuota(req.getUserName(), req); 152 } 153 } finally { 154 userLocks.unlock(req.getUserName()); 155 } 156 } else if (req.hasTableName()) { 157 TableName table = ProtobufUtil.toTableName(req.getTableName()); 158 tableLocks.lock(table); 159 try { 160 setTableQuota(table, req); 161 } finally { 162 tableLocks.unlock(table); 163 } 164 } else if (req.hasNamespace()) { 165 namespaceLocks.lock(req.getNamespace()); 166 try { 167 setNamespaceQuota(req.getNamespace(), req); 168 } finally { 169 namespaceLocks.unlock(req.getNamespace()); 170 } 171 } else if (req.hasRegionServer()) { 172 regionServerLocks.lock(req.getRegionServer()); 173 try { 174 setRegionServerQuota(req.getRegionServer(), req); 175 } finally { 176 regionServerLocks.unlock(req.getRegionServer()); 177 } 178 } else { 179 throw new DoNotRetryIOException(new UnsupportedOperationException( 180 "a user, a table, a namespace or region server must be specified")); 181 } 182 return SetQuotaResponse.newBuilder().build(); 183 } 184 185 public void setUserQuota(final String userName, final SetQuotaRequest req) 186 throws IOException, InterruptedException { 187 setQuota(req, new SetQuotaOperations() { 188 @Override 189 public GlobalQuotaSettingsImpl fetch() throws IOException { 190 return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null, 191 QuotaUtil.getUserQuota(masterServices.getConnection(), userName)); 192 } 193 @Override 194 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 195 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas()); 196 } 197 @Override 198 public void delete() throws IOException { 199 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName); 200 } 201 @Override 202 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 203 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo); 204 } 205 @Override 206 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 207 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo); 208 } 209 }); 210 } 211 212 public void setUserQuota(final String userName, final TableName table, 213 final SetQuotaRequest req) throws IOException, InterruptedException { 214 setQuota(req, new SetQuotaOperations() { 215 @Override 216 public GlobalQuotaSettingsImpl fetch() throws IOException { 217 return new GlobalQuotaSettingsImpl(userName, table, null, null, 218 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table)); 219 } 220 @Override 221 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 222 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, 223 quotaPojo.toQuotas()); 224 } 225 @Override 226 public void delete() throws IOException { 227 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table); 228 } 229 @Override 230 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 231 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo); 232 } 233 @Override 234 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 235 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo); 236 } 237 }); 238 } 239 240 public void setUserQuota(final String userName, final String namespace, 241 final SetQuotaRequest req) throws IOException, InterruptedException { 242 setQuota(req, new SetQuotaOperations() { 243 @Override 244 public GlobalQuotaSettingsImpl fetch() throws IOException { 245 return new GlobalQuotaSettingsImpl(userName, null, namespace, null, 246 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace)); 247 } 248 @Override 249 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 250 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, 251 quotaPojo.toQuotas()); 252 } 253 @Override 254 public void delete() throws IOException { 255 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace); 256 } 257 @Override 258 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 259 masterServices.getMasterCoprocessorHost().preSetUserQuota( 260 userName, namespace, quotaPojo); 261 } 262 @Override 263 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 264 masterServices.getMasterCoprocessorHost().postSetUserQuota( 265 userName, namespace, quotaPojo); 266 } 267 }); 268 } 269 270 public void setTableQuota(final TableName table, final SetQuotaRequest req) 271 throws IOException, InterruptedException { 272 setQuota(req, new SetQuotaOperations() { 273 @Override 274 public GlobalQuotaSettingsImpl fetch() throws IOException { 275 return new GlobalQuotaSettingsImpl(null, table, null, null, 276 QuotaUtil.getTableQuota(masterServices.getConnection(), table)); 277 } 278 @Override 279 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 280 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas()); 281 } 282 @Override 283 public void delete() throws IOException { 284 SpaceQuotaSnapshot currSnapshotOfTable = 285 QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table); 286 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table); 287 if (currSnapshotOfTable != null) { 288 SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus(); 289 if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null) 290 && quotaStatus.isInViolation()) { 291 QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table); 292 } 293 } 294 } 295 @Override 296 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 297 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo); 298 } 299 @Override 300 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 301 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo); 302 } 303 }); 304 } 305 306 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req) 307 throws IOException, InterruptedException { 308 setQuota(req, new SetQuotaOperations() { 309 @Override 310 public GlobalQuotaSettingsImpl fetch() throws IOException { 311 return new GlobalQuotaSettingsImpl(null, null, namespace, null, 312 QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace)); 313 } 314 @Override 315 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 316 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, 317 quotaPojo.toQuotas()); 318 } 319 @Override 320 public void delete() throws IOException { 321 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace); 322 } 323 @Override 324 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 325 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo); 326 } 327 @Override 328 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 329 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo); 330 } 331 }); 332 } 333 334 public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req) 335 throws IOException, InterruptedException { 336 setQuota(req, new SetQuotaOperations() { 337 @Override 338 public GlobalQuotaSettingsImpl fetch() throws IOException { 339 return new GlobalQuotaSettingsImpl(null, null, null, regionServer, 340 QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer)); 341 } 342 343 @Override 344 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 345 QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer, 346 quotaPojo.toQuotas()); 347 } 348 349 @Override 350 public void delete() throws IOException { 351 QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer); 352 } 353 354 @Override 355 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 356 masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo); 357 } 358 359 @Override 360 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 361 masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo); 362 } 363 }); 364 } 365 366 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException { 367 if (initialized) { 368 this.namespaceQuotaManager.addNamespace(desc); 369 } 370 } 371 372 public void removeNamespaceQuota(String namespace) throws IOException { 373 if (initialized) { 374 this.namespaceQuotaManager.deleteNamespace(namespace); 375 } 376 } 377 378 public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request) 379 throws IOException { 380 boolean rpcThrottle = request.getRpcThrottleEnabled(); 381 if (initialized) { 382 masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle); 383 boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled(); 384 if (rpcThrottle != oldRpcThrottle) { 385 LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(), 386 oldRpcThrottle, rpcThrottle); 387 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 388 SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage, 389 rpcThrottle, masterServices.getServerName(), latch); 390 masterServices.getMasterProcedureExecutor().submitProcedure(procedure); 391 latch.await(); 392 } else { 393 LOG.warn("Skip switch rpc throttle to {} because it's the same with old value", 394 rpcThrottle); 395 } 396 SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder() 397 .setPreviousRpcThrottleEnabled(oldRpcThrottle).build(); 398 masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle); 399 return response; 400 } else { 401 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle); 402 return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build(); 403 } 404 } 405 406 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request) 407 throws IOException { 408 if (initialized) { 409 masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled(); 410 boolean enabled = isRpcThrottleEnabled(); 411 IsRpcThrottleEnabledResponse response = 412 IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build(); 413 masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled); 414 return response; 415 } else { 416 LOG.warn("Skip get rpc throttle because rpc quota is disabled"); 417 return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build(); 418 } 419 } 420 421 public boolean isRpcThrottleEnabled() throws IOException { 422 return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false; 423 } 424 425 public SwitchExceedThrottleQuotaResponse 426 switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException { 427 boolean enabled = request.getExceedThrottleQuotaEnabled(); 428 if (initialized) { 429 masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled); 430 boolean previousEnabled = 431 QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()); 432 if (previousEnabled == enabled) { 433 LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value", 434 enabled); 435 } else { 436 QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled); 437 LOG.info("{} switch exceed throttle quota from {} to {}", 438 masterServices.getClientIdAuditPrefix(), previousEnabled, enabled); 439 } 440 SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder() 441 .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build(); 442 masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled, 443 enabled); 444 return response; 445 } else { 446 LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled); 447 return SwitchExceedThrottleQuotaResponse.newBuilder() 448 .setPreviousExceedThrottleQuotaEnabled(false).build(); 449 } 450 } 451 452 public boolean isExceedThrottleQuotaEnabled() throws IOException { 453 return initialized ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()) 454 : false; 455 } 456 457 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) 458 throws IOException, InterruptedException { 459 if (req.hasRemoveAll() && req.getRemoveAll() == true) { 460 quotaOps.preApply(null); 461 quotaOps.delete(); 462 quotaOps.postApply(null); 463 return; 464 } 465 466 // Apply quota changes 467 GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch(); 468 if (LOG.isTraceEnabled()) { 469 LOG.trace( 470 "Current quota for request(" + TextFormat.shortDebugString(req) 471 + "): " + currentQuota); 472 } 473 // Call the appropriate "pre" CP hook with the current quota value (may be null) 474 quotaOps.preApply(currentQuota); 475 // Translate the protobuf request back into a POJO 476 QuotaSettings newQuota = QuotaSettings.buildFromProto(req); 477 if (LOG.isTraceEnabled()) { 478 LOG.trace("Deserialized quota from request: " + newQuota); 479 } 480 481 // Merge the current quota settings with the new quota settings the user provided. 482 // 483 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one 484 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type. 485 GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota); 486 if (LOG.isTraceEnabled()) { 487 LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota); 488 } 489 490 // Submit new changes 491 if (mergedQuota == null) { 492 quotaOps.delete(); 493 } else { 494 quotaOps.update(mergedQuota); 495 } 496 // Advertise the final result via the "post" CP hook 497 quotaOps.postApply(mergedQuota); 498 } 499 500 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException { 501 if (initialized) { 502 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions); 503 } 504 } 505 506 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException { 507 if (initialized) { 508 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions); 509 } 510 } 511 512 /** 513 * @return cached region count, or -1 if quota manager is disabled or table status not found 514 */ 515 public int getRegionCountOfTable(TableName tName) throws IOException { 516 if (initialized) { 517 return namespaceQuotaManager.getRegionCountOfTable(tName); 518 } 519 return -1; 520 } 521 522 @Override 523 public void onRegionMerged(RegionInfo mergedRegion) throws IOException { 524 if (initialized) { 525 namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion); 526 } 527 } 528 529 @Override 530 public void onRegionSplit(RegionInfo hri) throws IOException { 531 if (initialized) { 532 namespaceQuotaManager.checkQuotaToSplitRegion(hri); 533 } 534 } 535 536 /** 537 * Remove table from namespace quota. 538 * 539 * @param tName - The table name to update quota usage. 540 * @throws IOException Signals that an I/O exception has occurred. 541 */ 542 public void removeTableFromNamespaceQuota(TableName tName) throws IOException { 543 if (initialized) { 544 namespaceQuotaManager.removeFromNamespaceUsage(tName); 545 } 546 } 547 548 public NamespaceAuditor getNamespaceQuotaManager() { 549 return this.namespaceQuotaManager; 550 } 551 552 /** 553 * Encapsulates CRUD quota operations for some subject. 554 */ 555 private static interface SetQuotaOperations { 556 /** 557 * Fetches the current quota settings for the subject. 558 */ 559 GlobalQuotaSettingsImpl fetch() throws IOException; 560 /** 561 * Deletes the quota for the subject. 562 */ 563 void delete() throws IOException; 564 /** 565 * Persist the given quota for the subject. 566 */ 567 void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 568 /** 569 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current 570 * quota for the subject. 571 */ 572 void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 573 /** 574 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting 575 * quota from the request action for the subject. 576 */ 577 void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 578 } 579 580 /* ========================================================================== 581 * Helpers 582 */ 583 584 private void checkQuotaSupport() throws IOException { 585 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 586 throw new DoNotRetryIOException( 587 new UnsupportedOperationException("quota support disabled")); 588 } 589 if (!initialized) { 590 long maxWaitTime = masterServices.getConfiguration().getLong( 591 "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds. 592 long startTime = EnvironmentEdgeManager.currentTime(); 593 do { 594 try { 595 Thread.sleep(100); 596 } catch (InterruptedException e) { 597 LOG.warn("Interrupted while waiting for Quota Manager to be initialized."); 598 break; 599 } 600 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime); 601 if (!initialized) { 602 throw new IOException("Quota manager is uninitialized, please retry later."); 603 } 604 } 605 } 606 607 private void createQuotaTable() throws IOException { 608 masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC); 609 } 610 611 private static class NamedLock<T> { 612 private final HashSet<T> locks = new HashSet<>(); 613 614 public void lock(final T name) throws InterruptedException { 615 synchronized (locks) { 616 while (locks.contains(name)) { 617 locks.wait(); 618 } 619 locks.add(name); 620 } 621 } 622 623 public void unlock(final T name) { 624 synchronized (locks) { 625 locks.remove(name); 626 locks.notifyAll(); 627 } 628 } 629 } 630 631 @Override 632 public void onRegionSplitReverted(RegionInfo hri) throws IOException { 633 if (initialized) { 634 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri); 635 } 636 } 637 638 /** 639 * Holds the size of a region at the given time, millis since the epoch. 640 */ 641 private static class SizeSnapshotWithTimestamp { 642 private final long size; 643 private final long time; 644 645 public SizeSnapshotWithTimestamp(long size, long time) { 646 this.size = size; 647 this.time = time; 648 } 649 650 public long getSize() { 651 return size; 652 } 653 654 public long getTime() { 655 return time; 656 } 657 658 @Override 659 public boolean equals(Object o) { 660 if (o instanceof SizeSnapshotWithTimestamp) { 661 SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o; 662 return size == other.size && time == other.time; 663 } 664 return false; 665 } 666 667 @Override 668 public int hashCode() { 669 HashCodeBuilder hcb = new HashCodeBuilder(); 670 return hcb.append(size).append(time).toHashCode(); 671 } 672 673 @Override 674 public String toString() { 675 StringBuilder sb = new StringBuilder(32); 676 sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, "); 677 sb.append("time=").append(time).append("}"); 678 return sb.toString(); 679 } 680 } 681 682 @VisibleForTesting 683 void initializeRegionSizes() { 684 assert regionSizes == null; 685 this.regionSizes = new ConcurrentHashMap<>(); 686 } 687 688 public void addRegionSize(RegionInfo hri, long size, long time) { 689 if (regionSizes == null) { 690 return; 691 } 692 regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time)); 693 } 694 695 public Map<RegionInfo, Long> snapshotRegionSizes() { 696 if (regionSizes == null) { 697 return EMPTY_MAP; 698 } 699 700 Map<RegionInfo, Long> copy = new HashMap<>(); 701 for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) { 702 copy.put(entry.getKey(), entry.getValue().getSize()); 703 } 704 return copy; 705 } 706 707 int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) { 708 if (regionSizes == null) { 709 return 0; 710 } 711 int numEntriesRemoved = 0; 712 Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator = 713 regionSizes.entrySet().iterator(); 714 while (iterator.hasNext()) { 715 RegionInfo regionInfo = iterator.next().getKey(); 716 long currentEntryTime = regionSizes.get(regionInfo).getTime(); 717 // do not prune the entries if table is in violation and 718 // violation policy is disable to avoid cycle of enable/disable. 719 // Please refer HBASE-22012 for more details. 720 // prune entries older than time. 721 if (currentEntryTime < timeToPruneBefore && !isInViolationAndPolicyDisable( 722 regionInfo.getTable(), quotaObserverChore)) { 723 iterator.remove(); 724 numEntriesRemoved++; 725 } 726 } 727 return numEntriesRemoved; 728 } 729 730 /** 731 * Method to check if a table is in violation and policy set on table is DISABLE. 732 * 733 * @param tableName tableName to check. 734 * @param quotaObserverChore QuotaObserverChore instance 735 * @return returns true if table is in violation and policy is disable else false. 736 */ 737 private boolean isInViolationAndPolicyDisable(TableName tableName, 738 QuotaObserverChore quotaObserverChore) { 739 boolean isInViolationAtTable = false; 740 boolean isInViolationAtNamespace = false; 741 SpaceViolationPolicy tablePolicy = null; 742 SpaceViolationPolicy namespacePolicy = null; 743 // Get Current Snapshot for the given table 744 SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName); 745 SpaceQuotaSnapshot namespaceQuotaSnapshot = 746 quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString()); 747 if (tableQuotaSnapshot != null) { 748 // check if table in violation 749 isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation(); 750 Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy(); 751 if (policy.isPresent()) { 752 tablePolicy = policy.get(); 753 } 754 } 755 if (namespaceQuotaSnapshot != null) { 756 // check namespace in violation 757 isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation(); 758 Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy(); 759 if (policy.isPresent()) { 760 namespacePolicy = policy.get(); 761 } 762 } 763 return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable) || ( 764 namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace); 765 } 766 767 /** 768 * Removes each region size entry where the RegionInfo references the provided TableName. 769 * 770 * @param tableName tableName. 771 */ 772 public void removeRegionSizesForTable(TableName tableName) { 773 regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName)); 774 } 775 776 public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn, 777 Configuration conf, FileSystem fs) throws IOException { 778 final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = HashMultimap.create(); 779 // Group the archived files by table 780 for (FileWithSize fileWithSize : request.getArchivedFilesList()) { 781 TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName()); 782 archivedFilesByTable.put( 783 tn, Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize())); 784 } 785 if (LOG.isTraceEnabled()) { 786 LOG.trace("Grouped archived files by table: " + archivedFilesByTable); 787 } 788 // Report each set of files to the appropriate object 789 for (TableName tn : archivedFilesByTable.keySet()) { 790 final Set<Entry<String,Long>> filesWithSize = archivedFilesByTable.get(tn); 791 final FileArchiverNotifier notifier = FileArchiverNotifierFactoryImpl.getInstance().get( 792 conn, conf, fs, tn); 793 notifier.addArchivedFiles(filesWithSize); 794 } 795 } 796} 797