001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.concurrent.ConcurrentHashMap; 029 030import org.apache.commons.lang3.builder.HashCodeBuilder; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.MetaTableAccessor; 033import org.apache.hadoop.hbase.NamespaceDescriptor; 034import org.apache.hadoop.hbase.RegionStateListener; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.master.MasterServices; 038import org.apache.hadoop.hbase.namespace.NamespaceAuditor; 039import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 050 051/** 052 * Master Quota Manager. 053 * It is responsible for initialize the quota table on the first-run and 054 * provide the admin operations to interact with the quota table. 055 * 056 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes 057 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER. 058 */ 059@InterfaceAudience.Private 060@InterfaceStability.Evolving 061public class MasterQuotaManager implements RegionStateListener { 062 private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class); 063 private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap( 064 new HashMap<>()); 065 066 private final MasterServices masterServices; 067 private NamedLock<String> namespaceLocks; 068 private NamedLock<TableName> tableLocks; 069 private NamedLock<String> userLocks; 070 private boolean initialized = false; 071 private NamespaceAuditor namespaceQuotaManager; 072 private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes; 073 074 public MasterQuotaManager(final MasterServices masterServices) { 075 this.masterServices = masterServices; 076 } 077 078 public void start() throws IOException { 079 // If the user doesn't want the quota support skip all the initializations. 080 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 081 LOG.info("Quota support disabled"); 082 return; 083 } 084 085 // Create the quota table if missing 086 if (!MetaTableAccessor.tableExists(masterServices.getConnection(), 087 QuotaUtil.QUOTA_TABLE_NAME)) { 088 LOG.info("Quota table not found. Creating..."); 089 createQuotaTable(); 090 } 091 092 LOG.info("Initializing quota support"); 093 namespaceLocks = new NamedLock<>(); 094 tableLocks = new NamedLock<>(); 095 userLocks = new NamedLock<>(); 096 regionSizes = new ConcurrentHashMap<>(); 097 098 namespaceQuotaManager = new NamespaceAuditor(masterServices); 099 namespaceQuotaManager.start(); 100 initialized = true; 101 } 102 103 public void stop() { 104 } 105 106 public boolean isQuotaInitialized() { 107 return initialized && namespaceQuotaManager.isInitialized(); 108 } 109 110 /* ========================================================================== 111 * Admin operations to manage the quota table 112 */ 113 public SetQuotaResponse setQuota(final SetQuotaRequest req) 114 throws IOException, InterruptedException { 115 checkQuotaSupport(); 116 117 if (req.hasUserName()) { 118 userLocks.lock(req.getUserName()); 119 try { 120 if (req.hasTableName()) { 121 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req); 122 } else if (req.hasNamespace()) { 123 setUserQuota(req.getUserName(), req.getNamespace(), req); 124 } else { 125 setUserQuota(req.getUserName(), req); 126 } 127 } finally { 128 userLocks.unlock(req.getUserName()); 129 } 130 } else if (req.hasTableName()) { 131 TableName table = ProtobufUtil.toTableName(req.getTableName()); 132 tableLocks.lock(table); 133 try { 134 setTableQuota(table, req); 135 } finally { 136 tableLocks.unlock(table); 137 } 138 } else if (req.hasNamespace()) { 139 namespaceLocks.lock(req.getNamespace()); 140 try { 141 setNamespaceQuota(req.getNamespace(), req); 142 } finally { 143 namespaceLocks.unlock(req.getNamespace()); 144 } 145 } else { 146 throw new DoNotRetryIOException( 147 new UnsupportedOperationException("a user, a table or a namespace must be specified")); 148 } 149 return SetQuotaResponse.newBuilder().build(); 150 } 151 152 public void setUserQuota(final String userName, final SetQuotaRequest req) 153 throws IOException, InterruptedException { 154 setQuota(req, new SetQuotaOperations() { 155 @Override 156 public GlobalQuotaSettingsImpl fetch() throws IOException { 157 return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, QuotaUtil.getUserQuota( 158 masterServices.getConnection(), userName)); 159 } 160 @Override 161 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 162 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas()); 163 } 164 @Override 165 public void delete() throws IOException { 166 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName); 167 } 168 @Override 169 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 170 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo); 171 } 172 @Override 173 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 174 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo); 175 } 176 }); 177 } 178 179 public void setUserQuota(final String userName, final TableName table, 180 final SetQuotaRequest req) throws IOException, InterruptedException { 181 setQuota(req, new SetQuotaOperations() { 182 @Override 183 public GlobalQuotaSettingsImpl fetch() throws IOException { 184 return new GlobalQuotaSettingsImpl(userName, table, null, QuotaUtil.getUserQuota( 185 masterServices.getConnection(), userName, table)); 186 } 187 @Override 188 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 189 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, 190 quotaPojo.toQuotas()); 191 } 192 @Override 193 public void delete() throws IOException { 194 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table); 195 } 196 @Override 197 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 198 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo); 199 } 200 @Override 201 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 202 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo); 203 } 204 }); 205 } 206 207 public void setUserQuota(final String userName, final String namespace, 208 final SetQuotaRequest req) throws IOException, InterruptedException { 209 setQuota(req, new SetQuotaOperations() { 210 @Override 211 public GlobalQuotaSettingsImpl fetch() throws IOException { 212 return new GlobalQuotaSettingsImpl(userName, null, namespace, QuotaUtil.getUserQuota( 213 masterServices.getConnection(), userName, namespace)); 214 } 215 @Override 216 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 217 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, 218 quotaPojo.toQuotas()); 219 } 220 @Override 221 public void delete() throws IOException { 222 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace); 223 } 224 @Override 225 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 226 masterServices.getMasterCoprocessorHost().preSetUserQuota( 227 userName, namespace, quotaPojo); 228 } 229 @Override 230 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 231 masterServices.getMasterCoprocessorHost().postSetUserQuota( 232 userName, namespace, quotaPojo); 233 } 234 }); 235 } 236 237 public void setTableQuota(final TableName table, final SetQuotaRequest req) 238 throws IOException, InterruptedException { 239 setQuota(req, new SetQuotaOperations() { 240 @Override 241 public GlobalQuotaSettingsImpl fetch() throws IOException { 242 return new GlobalQuotaSettingsImpl(null, table, null, QuotaUtil.getTableQuota( 243 masterServices.getConnection(), table)); 244 } 245 @Override 246 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 247 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas()); 248 } 249 @Override 250 public void delete() throws IOException { 251 SpaceQuotaSnapshot currSnapshotOfTable = 252 QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table); 253 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table); 254 if (currSnapshotOfTable != null) { 255 SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus(); 256 if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy() 257 && quotaStatus.isInViolation()) { 258 QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table); 259 } 260 } 261 } 262 @Override 263 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 264 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo); 265 } 266 @Override 267 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 268 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo); 269 } 270 }); 271 } 272 273 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req) 274 throws IOException, InterruptedException { 275 setQuota(req, new SetQuotaOperations() { 276 @Override 277 public GlobalQuotaSettingsImpl fetch() throws IOException { 278 return new GlobalQuotaSettingsImpl(null, null, namespace, QuotaUtil.getNamespaceQuota( 279 masterServices.getConnection(), namespace)); 280 } 281 @Override 282 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 283 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, 284 ((GlobalQuotaSettingsImpl) quotaPojo).toQuotas()); 285 } 286 @Override 287 public void delete() throws IOException { 288 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace); 289 } 290 @Override 291 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 292 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo); 293 } 294 @Override 295 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 296 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo); 297 } 298 }); 299 } 300 301 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException { 302 if (initialized) { 303 this.namespaceQuotaManager.addNamespace(desc); 304 } 305 } 306 307 public void removeNamespaceQuota(String namespace) throws IOException { 308 if (initialized) { 309 this.namespaceQuotaManager.deleteNamespace(namespace); 310 } 311 } 312 313 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) 314 throws IOException, InterruptedException { 315 if (req.hasRemoveAll() && req.getRemoveAll() == true) { 316 quotaOps.preApply(null); 317 quotaOps.delete(); 318 quotaOps.postApply(null); 319 return; 320 } 321 322 // Apply quota changes 323 GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch(); 324 if (LOG.isTraceEnabled()) { 325 LOG.trace( 326 "Current quota for request(" + TextFormat.shortDebugString(req) 327 + "): " + currentQuota); 328 } 329 // Call the appropriate "pre" CP hook with the current quota value (may be null) 330 quotaOps.preApply(currentQuota); 331 // Translate the protobuf request back into a POJO 332 QuotaSettings newQuota = QuotaSettings.buildFromProto(req); 333 if (LOG.isTraceEnabled()) { 334 LOG.trace("Deserialized quota from request: " + newQuota); 335 } 336 337 // Merge the current quota settings with the new quota settings the user provided. 338 // 339 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one 340 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type. 341 GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota); 342 if (LOG.isTraceEnabled()) { 343 LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota); 344 } 345 346 // Submit new changes 347 if (mergedQuota == null) { 348 quotaOps.delete(); 349 } else { 350 quotaOps.update(mergedQuota); 351 } 352 // Advertise the final result via the "post" CP hook 353 quotaOps.postApply(mergedQuota); 354 } 355 356 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException { 357 if (initialized) { 358 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions); 359 } 360 } 361 362 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException { 363 if (initialized) { 364 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions); 365 } 366 } 367 368 /** 369 * @return cached region count, or -1 if quota manager is disabled or table status not found 370 */ 371 public int getRegionCountOfTable(TableName tName) throws IOException { 372 if (initialized) { 373 return namespaceQuotaManager.getRegionCountOfTable(tName); 374 } 375 return -1; 376 } 377 378 @Override 379 public void onRegionMerged(RegionInfo mergedRegion) throws IOException { 380 if (initialized) { 381 namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion); 382 } 383 } 384 385 @Override 386 public void onRegionSplit(RegionInfo hri) throws IOException { 387 if (initialized) { 388 namespaceQuotaManager.checkQuotaToSplitRegion(hri); 389 } 390 } 391 392 /** 393 * Remove table from namespace quota. 394 * 395 * @param tName - The table name to update quota usage. 396 * @throws IOException Signals that an I/O exception has occurred. 397 */ 398 public void removeTableFromNamespaceQuota(TableName tName) throws IOException { 399 if (initialized) { 400 namespaceQuotaManager.removeFromNamespaceUsage(tName); 401 } 402 } 403 404 public NamespaceAuditor getNamespaceQuotaManager() { 405 return this.namespaceQuotaManager; 406 } 407 408 /** 409 * Encapsulates CRUD quota operations for some subject. 410 */ 411 private static interface SetQuotaOperations { 412 /** 413 * Fetches the current quota settings for the subject. 414 */ 415 GlobalQuotaSettingsImpl fetch() throws IOException; 416 /** 417 * Deletes the quota for the subject. 418 */ 419 void delete() throws IOException; 420 /** 421 * Persist the given quota for the subject. 422 */ 423 void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 424 /** 425 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current 426 * quota for the subject. 427 */ 428 void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 429 /** 430 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting 431 * quota from the request action for the subject. 432 */ 433 void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 434 } 435 436 /* ========================================================================== 437 * Helpers 438 */ 439 440 private void checkQuotaSupport() throws IOException { 441 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 442 throw new DoNotRetryIOException( 443 new UnsupportedOperationException("quota support disabled")); 444 } 445 if (!initialized) { 446 long maxWaitTime = masterServices.getConfiguration().getLong( 447 "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds. 448 long startTime = EnvironmentEdgeManager.currentTime(); 449 do { 450 try { 451 Thread.sleep(100); 452 } catch (InterruptedException e) { 453 LOG.warn("Interrupted while waiting for Quota Manager to be initialized."); 454 break; 455 } 456 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime); 457 if (!initialized) { 458 throw new IOException("Quota manager is uninitialized, please retry later."); 459 } 460 } 461 } 462 463 private void createQuotaTable() throws IOException { 464 masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC); 465 } 466 467 private static class NamedLock<T> { 468 private final HashSet<T> locks = new HashSet<>(); 469 470 public void lock(final T name) throws InterruptedException { 471 synchronized (locks) { 472 while (locks.contains(name)) { 473 locks.wait(); 474 } 475 locks.add(name); 476 } 477 } 478 479 public void unlock(final T name) { 480 synchronized (locks) { 481 locks.remove(name); 482 locks.notifyAll(); 483 } 484 } 485 } 486 487 @Override 488 public void onRegionSplitReverted(RegionInfo hri) throws IOException { 489 if (initialized) { 490 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri); 491 } 492 } 493 494 /** 495 * Holds the size of a region at the given time, millis since the epoch. 496 */ 497 private static class SizeSnapshotWithTimestamp { 498 private final long size; 499 private final long time; 500 501 public SizeSnapshotWithTimestamp(long size, long time) { 502 this.size = size; 503 this.time = time; 504 } 505 506 public long getSize() { 507 return size; 508 } 509 510 public long getTime() { 511 return time; 512 } 513 514 @Override 515 public boolean equals(Object o) { 516 if (o instanceof SizeSnapshotWithTimestamp) { 517 SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o; 518 return size == other.size && time == other.time; 519 } 520 return false; 521 } 522 523 @Override 524 public int hashCode() { 525 HashCodeBuilder hcb = new HashCodeBuilder(); 526 return hcb.append(size).append(time).toHashCode(); 527 } 528 529 @Override 530 public String toString() { 531 StringBuilder sb = new StringBuilder(32); 532 sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, "); 533 sb.append("time=").append(time).append("}"); 534 return sb.toString(); 535 } 536 } 537 538 @VisibleForTesting 539 void initializeRegionSizes() { 540 assert regionSizes == null; 541 this.regionSizes = new ConcurrentHashMap<>(); 542 } 543 544 public void addRegionSize(RegionInfo hri, long size, long time) { 545 if (regionSizes == null) { 546 return; 547 } 548 regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time)); 549 } 550 551 public Map<RegionInfo, Long> snapshotRegionSizes() { 552 if (regionSizes == null) { 553 return EMPTY_MAP; 554 } 555 556 Map<RegionInfo, Long> copy = new HashMap<>(); 557 for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) { 558 copy.put(entry.getKey(), entry.getValue().getSize()); 559 } 560 return copy; 561 } 562 563 int pruneEntriesOlderThan(long timeToPruneBefore) { 564 if (regionSizes == null) { 565 return 0; 566 } 567 int numEntriesRemoved = 0; 568 Iterator<Entry<RegionInfo,SizeSnapshotWithTimestamp>> iterator = 569 regionSizes.entrySet().iterator(); 570 while (iterator.hasNext()) { 571 long currentEntryTime = iterator.next().getValue().getTime(); 572 if (currentEntryTime < timeToPruneBefore) { 573 iterator.remove(); 574 numEntriesRemoved++; 575 } 576 } 577 return numEntriesRemoved; 578 } 579} 580