001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.ArrayList; 023import java.util.EnumSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.TimeUnit; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.ClusterMetrics.Option; 035import org.apache.hadoop.hbase.ScheduledChore; 036import org.apache.hadoop.hbase.Stoppable; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.RegionStatesCount; 040import org.apache.hadoop.hbase.ipc.RpcCall; 041import org.apache.hadoop.hbase.ipc.RpcServer; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.regionserver.RegionServerServices; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.security.UserGroupInformation; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.apache.yetus.audience.InterfaceStability; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 053import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 054import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 055 056/** 057 * Cache that keeps track of the quota settings for the users and tables that are interacting with 058 * it. 059 */ 060@InterfaceAudience.Private 061@InterfaceStability.Evolving 062public class QuotaCache implements Stoppable { 063 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 064 065 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 066 public static final String TABLE_REGION_STATES_CACHE_TTL_MS = 067 "hbase.quota.cache.ttl.region.states.ms"; 068 public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = 069 "hbase.quota.cache.ttl.servers.size.ms"; 070 071 // defines the request attribute key which, when provided, will override the request's username 072 // from the perspective of user quotas 073 public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = 074 "hbase.quota.user.override.key"; 075 private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours 076 private static final int EVICT_PERIOD_FACTOR = 5; 077 078 // for testing purpose only, enforce the cache to be always refreshed 079 static boolean TEST_FORCE_REFRESH = false; 080 // for testing purpose only, block cache refreshes to reliably verify state 081 static boolean TEST_BLOCK_REFRESH = false; 082 083 private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 084 private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 085 private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 086 private final ConcurrentMap<String, QuotaState> regionServerQuotaCache = 087 new ConcurrentHashMap<>(); 088 private volatile boolean exceedThrottleQuotaEnabled = false; 089 // factors used to divide cluster scope quota into machine scope quota 090 private volatile double machineQuotaFactor = 1; 091 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 092 new ConcurrentHashMap<>(); 093 private final RegionServerServices rsServices; 094 private final String userOverrideRequestAttributeKey; 095 096 private QuotaRefresherChore refreshChore; 097 private boolean stopped = true; 098 099 private final Fetcher<String, UserQuotaState> userQuotaStateFetcher = new Fetcher<>() { 100 @Override 101 public Get makeGet(final String user) { 102 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 103 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 104 return QuotaUtil.makeGetForUserQuotas(user, tables, namespaces); 105 } 106 107 @Override 108 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { 109 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, tableMachineQuotaFactors, 110 machineQuotaFactor); 111 } 112 }; 113 114 private final Fetcher<String, QuotaState> regionServerQuotaStateFetcher = new Fetcher<>() { 115 @Override 116 public Get makeGet(final String regionServer) { 117 return QuotaUtil.makeGetForRegionServerQuotas(regionServer); 118 } 119 120 @Override 121 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 122 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 123 } 124 }; 125 126 private final Fetcher<TableName, QuotaState> tableQuotaStateFetcher = new Fetcher<>() { 127 @Override 128 public Get makeGet(final TableName table) { 129 return QuotaUtil.makeGetForTableQuotas(table); 130 } 131 132 @Override 133 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 134 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, tableMachineQuotaFactors); 135 } 136 }; 137 138 private final Fetcher<String, QuotaState> namespaceQuotaStateFetcher = new Fetcher<>() { 139 @Override 140 public Get makeGet(final String namespace) { 141 return QuotaUtil.makeGetForNamespaceQuotas(namespace); 142 } 143 144 @Override 145 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 146 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, machineQuotaFactor); 147 } 148 }; 149 150 public QuotaCache(final RegionServerServices rsServices) { 151 this.rsServices = rsServices; 152 this.userOverrideRequestAttributeKey = 153 rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); 154 } 155 156 public void start() throws IOException { 157 stopped = false; 158 159 Configuration conf = rsServices.getConfiguration(); 160 // Refresh the cache every 12 hours, and every time a quota is changed, and every time a 161 // configuration 162 // reload is triggered. Periodic reloads are kept to a minimum to avoid flooding the 163 // RegionServer 164 // holding the hbase:quota table with requests. 165 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 166 refreshChore = new QuotaRefresherChore(conf, period, this); 167 rsServices.getChoreService().scheduleChore(refreshChore); 168 } 169 170 @Override 171 public void stop(final String why) { 172 if (refreshChore != null) { 173 LOG.debug("Stopping QuotaRefresherChore chore."); 174 refreshChore.shutdown(true); 175 } 176 stopped = true; 177 } 178 179 @Override 180 public boolean isStopped() { 181 return stopped; 182 } 183 184 /** 185 * Returns the limiter associated to the specified user/table. 186 * @param ugi the user to limit 187 * @param table the table to limit 188 * @return the limiter associated to the specified user/table 189 */ 190 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 191 if (table.isSystemTable()) { 192 return NoopQuotaLimiter.get(); 193 } 194 return getUserQuotaState(ugi).getTableLimiter(table); 195 } 196 197 /** 198 * Returns the QuotaState associated to the specified user. 199 * @param ugi the user 200 * @return the quota info associated to specified user 201 */ 202 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 203 String user = getQuotaUserName(ugi); 204 if (!userQuotaCache.containsKey(user)) { 205 userQuotaCache.put(user, 206 QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); 207 fetch("user", userQuotaCache, userQuotaStateFetcher); 208 } 209 return userQuotaCache.get(user); 210 } 211 212 /** 213 * Returns the limiter associated to the specified table. 214 * @param table the table to limit 215 * @return the limiter associated to the specified table 216 */ 217 public QuotaLimiter getTableLimiter(final TableName table) { 218 if (!tableQuotaCache.containsKey(table)) { 219 tableQuotaCache.put(table, new QuotaState()); 220 fetch("table", tableQuotaCache, tableQuotaStateFetcher); 221 } 222 return tableQuotaCache.get(table).getGlobalLimiter(); 223 } 224 225 /** 226 * Returns the limiter associated to the specified namespace. 227 * @param namespace the namespace to limit 228 * @return the limiter associated to the specified namespace 229 */ 230 public QuotaLimiter getNamespaceLimiter(final String namespace) { 231 if (!namespaceQuotaCache.containsKey(namespace)) { 232 namespaceQuotaCache.put(namespace, new QuotaState()); 233 fetch("namespace", namespaceQuotaCache, namespaceQuotaStateFetcher); 234 } 235 return namespaceQuotaCache.get(namespace).getGlobalLimiter(); 236 } 237 238 /** 239 * Returns the limiter associated to the specified region server. 240 * @param regionServer the region server to limit 241 * @return the limiter associated to the specified region server 242 */ 243 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 244 if (!regionServerQuotaCache.containsKey(regionServer)) { 245 regionServerQuotaCache.put(regionServer, new QuotaState()); 246 fetch("regionServer", regionServerQuotaCache, regionServerQuotaStateFetcher); 247 } 248 return regionServerQuotaCache.get(regionServer).getGlobalLimiter(); 249 } 250 251 protected boolean isExceedThrottleQuotaEnabled() { 252 return exceedThrottleQuotaEnabled; 253 } 254 255 private <K, V extends QuotaState> void fetch(final String type, final Map<K, V> quotasMap, 256 final Fetcher<K, V> fetcher) { 257 // Find the quota entries to update 258 List<Get> gets = quotasMap.keySet().stream().map(fetcher::makeGet).collect(Collectors.toList()); 259 260 // fetch and update the quota entries 261 if (!gets.isEmpty()) { 262 try { 263 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { 264 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 265 if (quotaInfo != null) { 266 quotaInfo.update(entry.getValue()); 267 } 268 269 if (LOG.isTraceEnabled()) { 270 LOG.trace("Loading {} key={} quotas={}", type, entry.getKey(), quotaInfo); 271 } 272 } 273 } catch (IOException e) { 274 LOG.warn("Unable to read {} from quota table", type, e); 275 } 276 } 277 } 278 279 /** 280 * Applies a request attribute user override if available, otherwise returns the UGI's short 281 * username 282 * @param ugi The request's UserGroupInformation 283 */ 284 private String getQuotaUserName(final UserGroupInformation ugi) { 285 if (userOverrideRequestAttributeKey == null) { 286 return ugi.getShortUserName(); 287 } 288 289 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 290 if (!rpcCall.isPresent()) { 291 return ugi.getShortUserName(); 292 } 293 294 byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); 295 if (override == null) { 296 return ugi.getShortUserName(); 297 } 298 return Bytes.toString(override); 299 } 300 301 void triggerCacheRefresh() { 302 refreshChore.triggerNow(); 303 } 304 305 void forceSynchronousCacheRefresh() { 306 refreshChore.chore(); 307 } 308 309 Map<String, QuotaState> getNamespaceQuotaCache() { 310 return namespaceQuotaCache; 311 } 312 313 Map<String, QuotaState> getRegionServerQuotaCache() { 314 return regionServerQuotaCache; 315 } 316 317 Map<TableName, QuotaState> getTableQuotaCache() { 318 return tableQuotaCache; 319 } 320 321 Map<String, UserQuotaState> getUserQuotaCache() { 322 return userQuotaCache; 323 } 324 325 // TODO: Remove this once we have the notification bus 326 private class QuotaRefresherChore extends ScheduledChore { 327 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. 328 // So we cache the results to reduce that load. 329 private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; 330 private final RefreshableExpiringValueCache<Integer> regionServersSize; 331 332 public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { 333 super("QuotaRefresherChore", stoppable, period); 334 335 Duration tableRegionStatesCacheTtl = 336 Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); 337 this.tableRegionStatesClusterMetrics = 338 new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", 339 tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() 340 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); 341 342 Duration regionServersSizeCacheTtl = 343 Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); 344 regionServersSize = 345 new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, 346 () -> rsServices.getConnection().getAdmin().getRegionServers().size()); 347 } 348 349 @Override 350 public synchronized boolean triggerNow() { 351 tableRegionStatesClusterMetrics.invalidate(); 352 regionServersSize.invalidate(); 353 return super.triggerNow(); 354 } 355 356 @Override 357 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", 358 justification = "I do not understand why the complaints, it looks good to me -- FIX") 359 protected void chore() { 360 while (TEST_BLOCK_REFRESH) { 361 LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false"); 362 try { 363 Thread.sleep(10); 364 } catch (InterruptedException e) { 365 throw new RuntimeException(e); 366 } 367 } 368 // Prefetch online tables/namespaces 369 for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) { 370 if (table.isSystemTable()) { 371 continue; 372 } 373 QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState()); 374 375 final String ns = table.getNamespaceAsString(); 376 377 QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState()); 378 } 379 380 QuotaCache.this.regionServerQuotaCache 381 .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); 382 383 updateQuotaFactors(); 384 fetchAndEvict("namespace", QuotaCache.this.namespaceQuotaCache, namespaceQuotaStateFetcher); 385 fetchAndEvict("table", QuotaCache.this.tableQuotaCache, tableQuotaStateFetcher); 386 fetchAndEvict("user", QuotaCache.this.userQuotaCache, userQuotaStateFetcher); 387 fetchAndEvict("regionServer", QuotaCache.this.regionServerQuotaCache, 388 regionServerQuotaStateFetcher); 389 fetchExceedThrottleQuota(); 390 } 391 392 private void fetchExceedThrottleQuota() { 393 try { 394 QuotaCache.this.exceedThrottleQuotaEnabled = 395 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 396 } catch (IOException e) { 397 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 398 } 399 } 400 401 private <K, V extends QuotaState> void fetchAndEvict(final String type, 402 final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 403 long now = EnvironmentEdgeManager.currentTime(); 404 long evictPeriod = getPeriod() * EVICT_PERIOD_FACTOR; 405 // Find the quota entries to update 406 List<Get> gets = new ArrayList<>(); 407 List<K> toRemove = new ArrayList<>(); 408 for (Map.Entry<K, V> entry : quotasMap.entrySet()) { 409 long lastQuery = entry.getValue().getLastQuery(); 410 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 411 toRemove.add(entry.getKey()); 412 } else { 413 gets.add(fetcher.makeGet(entry.getKey())); 414 } 415 } 416 417 for (final K key : toRemove) { 418 if (LOG.isTraceEnabled()) { 419 LOG.trace("evict " + type + " key=" + key); 420 } 421 quotasMap.remove(key); 422 } 423 424 // fetch and update the quota entries 425 if (!gets.isEmpty()) { 426 try { 427 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { 428 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 429 if (quotaInfo != null) { 430 quotaInfo.update(entry.getValue()); 431 } 432 433 if (LOG.isTraceEnabled()) { 434 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 435 } 436 } 437 } catch (IOException e) { 438 LOG.warn("Unable to read " + type + " from quota table", e); 439 } 440 } 441 } 442 443 /** 444 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 445 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 446 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 447 */ 448 private void updateQuotaFactors() { 449 boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() 450 || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); 451 if (hasTableQuotas) { 452 updateTableMachineQuotaFactors(); 453 } else { 454 updateOnlyMachineQuotaFactors(); 455 } 456 } 457 458 /** 459 * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if 460 * we don't have any table quotas in the cache. 461 */ 462 private void updateOnlyMachineQuotaFactors() { 463 Optional<Integer> rsSize = regionServersSize.get(); 464 if (rsSize.isPresent()) { 465 updateMachineQuotaFactors(rsSize.get()); 466 } else { 467 regionServersSize.refresh(); 468 } 469 } 470 471 /** 472 * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine 473 * factors as well. This relies on a more expensive query for ClusterMetrics. 474 */ 475 private void updateTableMachineQuotaFactors() { 476 Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); 477 if (!clusterMetricsMaybe.isPresent()) { 478 tableRegionStatesClusterMetrics.refresh(); 479 return; 480 } 481 ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); 482 updateMachineQuotaFactors(clusterMetrics.getServersName().size()); 483 484 Map<TableName, RegionStatesCount> tableRegionStatesCount = 485 clusterMetrics.getTableRegionStatesCount(); 486 487 // Update table machine quota factors 488 for (TableName tableName : tableQuotaCache.keySet()) { 489 if (tableRegionStatesCount.containsKey(tableName)) { 490 double factor = 1; 491 try { 492 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 493 if (regionSize == 0) { 494 factor = 0; 495 } else { 496 int localRegionSize = rsServices.getRegions(tableName).size(); 497 factor = 1.0 * localRegionSize / regionSize; 498 } 499 } catch (IOException e) { 500 LOG.warn("Get table regions failed: {}", tableName, e); 501 } 502 tableMachineQuotaFactors.put(tableName, factor); 503 } else { 504 // TableName might have already been dropped (outdated) 505 tableMachineQuotaFactors.remove(tableName); 506 } 507 } 508 } 509 510 private void updateMachineQuotaFactors(int rsSize) { 511 if (rsSize != 0) { 512 // TODO if use rs group, the cluster limit should be shared by the rs group 513 machineQuotaFactor = 1.0 / rsSize; 514 } 515 } 516 } 517 518 static class RefreshableExpiringValueCache<T> { 519 private final String name; 520 private final LoadingCache<String, Optional<T>> cache; 521 522 RefreshableExpiringValueCache(String name, Duration refreshPeriod, 523 ThrowingSupplier<T> supplier) { 524 this.name = name; 525 this.cache = 526 CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) 527 .build(new CacheLoader<>() { 528 @Override 529 public Optional<T> load(String key) { 530 try { 531 return Optional.of(supplier.get()); 532 } catch (Exception e) { 533 LOG.warn("Failed to refresh cache {}", name, e); 534 return Optional.empty(); 535 } 536 } 537 }); 538 } 539 540 Optional<T> get() { 541 return cache.getUnchecked(name); 542 } 543 544 void refresh() { 545 cache.refresh(name); 546 } 547 548 void invalidate() { 549 cache.invalidate(name); 550 } 551 } 552 553 @FunctionalInterface 554 static interface ThrowingSupplier<T> { 555 T get() throws Exception; 556 } 557 558 interface Fetcher<Key, Value> { 559 Get makeGet(Key key); 560 561 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 562 } 563}