001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.EnumSet; 023import java.util.Map; 024import java.util.Optional; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ClusterMetrics; 029import org.apache.hadoop.hbase.ClusterMetrics.Option; 030import org.apache.hadoop.hbase.ScheduledChore; 031import org.apache.hadoop.hbase.Stoppable; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.RegionStatesCount; 034import org.apache.hadoop.hbase.ipc.RpcCall; 035import org.apache.hadoop.hbase.ipc.RpcServer; 036import org.apache.hadoop.hbase.regionserver.RegionServerServices; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.security.UserGroupInformation; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 045import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 046import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 047 048/** 049 * Cache that keeps track of the quota settings for the users and tables that are interacting with 050 * it. 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054public class QuotaCache implements Stoppable { 055 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 056 057 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 058 public static final String TABLE_REGION_STATES_CACHE_TTL_MS = 059 "hbase.quota.cache.ttl.region.states.ms"; 060 public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = 061 "hbase.quota.cache.ttl.servers.size.ms"; 062 063 // defines the request attribute key which, when provided, will override the request's username 064 // from the perspective of user quotas 065 public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = 066 "hbase.quota.user.override.key"; 067 private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours 068 069 private final Object initializerLock = new Object(); 070 private volatile boolean initialized = false; 071 072 private volatile Map<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 073 private volatile Map<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 074 private volatile Map<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 075 private volatile Map<String, QuotaState> regionServerQuotaCache = new ConcurrentHashMap<>(); 076 077 private volatile boolean exceedThrottleQuotaEnabled = false; 078 // factors used to divide cluster scope quota into machine scope quota 079 private volatile double machineQuotaFactor = 1; 080 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 081 new ConcurrentHashMap<>(); 082 private final RegionServerServices rsServices; 083 private final String userOverrideRequestAttributeKey; 084 085 private QuotaRefresherChore refreshChore; 086 private boolean stopped = true; 087 088 public QuotaCache(final RegionServerServices rsServices) { 089 this.rsServices = rsServices; 090 this.userOverrideRequestAttributeKey = 091 rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); 092 } 093 094 public void start() throws IOException { 095 stopped = false; 096 097 Configuration conf = rsServices.getConfiguration(); 098 // Refresh the cache every 12 hours, and every time a quota is changed, and every time a 099 // configuration reload is triggered. Periodic reloads are kept to a minimum to avoid 100 // flooding the RegionServer holding the hbase:quota table with requests. 101 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 102 refreshChore = new QuotaRefresherChore(conf, period, this); 103 rsServices.getChoreService().scheduleChore(refreshChore); 104 } 105 106 @Override 107 public void stop(final String why) { 108 if (refreshChore != null) { 109 LOG.debug("Stopping QuotaRefresherChore chore."); 110 refreshChore.shutdown(true); 111 } 112 stopped = true; 113 } 114 115 @Override 116 public boolean isStopped() { 117 return stopped; 118 } 119 120 private void ensureInitialized() { 121 if (!initialized) { 122 synchronized (initializerLock) { 123 if (!initialized) { 124 refreshChore.chore(); 125 initialized = true; 126 } 127 } 128 } 129 } 130 131 private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException { 132 return QuotaUtil.fetchUserQuotas(rsServices.getConfiguration(), rsServices.getConnection(), 133 tableMachineQuotaFactors, machineQuotaFactor); 134 } 135 136 private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException { 137 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConfiguration(), 138 rsServices.getConnection()); 139 } 140 141 private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException { 142 return QuotaUtil.fetchTableQuotas(rsServices.getConfiguration(), rsServices.getConnection(), 143 tableMachineQuotaFactors); 144 } 145 146 private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException { 147 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConfiguration(), rsServices.getConnection(), 148 machineQuotaFactor); 149 } 150 151 /** 152 * Returns the limiter associated to the specified user/table. 153 * @param ugi the user to limit 154 * @param table the table to limit 155 * @return the limiter associated to the specified user/table 156 */ 157 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 158 if (table.isSystemTable()) { 159 return NoopQuotaLimiter.get(); 160 } 161 return getUserQuotaState(ugi).getTableLimiter(table); 162 } 163 164 /** 165 * Returns the QuotaState associated to the specified user. 166 * @param ugi the user 167 * @return the quota info associated to specified user 168 */ 169 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 170 String user = getQuotaUserName(ugi); 171 ensureInitialized(); 172 // local reference because the chore thread may assign to userQuotaCache 173 Map<String, UserQuotaState> cache = userQuotaCache; 174 if (!cache.containsKey(user)) { 175 cache.put(user, QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration())); 176 } 177 return cache.get(user); 178 } 179 180 /** 181 * Returns the limiter associated to the specified table. 182 * @param table the table to limit 183 * @return the limiter associated to the specified table 184 */ 185 public QuotaLimiter getTableLimiter(final TableName table) { 186 ensureInitialized(); 187 // local reference because the chore thread may assign to tableQuotaCache 188 Map<TableName, QuotaState> cache = tableQuotaCache; 189 if (!cache.containsKey(table)) { 190 cache.put(table, new QuotaState()); 191 } 192 return cache.get(table).getGlobalLimiter(); 193 } 194 195 /** 196 * Returns the limiter associated to the specified namespace. 197 * @param namespace the namespace to limit 198 * @return the limiter associated to the specified namespace 199 */ 200 public QuotaLimiter getNamespaceLimiter(final String namespace) { 201 ensureInitialized(); 202 // local reference because the chore thread may assign to namespaceQuotaCache 203 Map<String, QuotaState> cache = namespaceQuotaCache; 204 if (!cache.containsKey(namespace)) { 205 cache.put(namespace, new QuotaState()); 206 } 207 return cache.get(namespace).getGlobalLimiter(); 208 } 209 210 /** 211 * Returns the limiter associated to the specified region server. 212 * @param regionServer the region server to limit 213 * @return the limiter associated to the specified region server 214 */ 215 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 216 ensureInitialized(); 217 // local reference because the chore thread may assign to regionServerQuotaCache 218 Map<String, QuotaState> cache = regionServerQuotaCache; 219 if (!cache.containsKey(regionServer)) { 220 cache.put(regionServer, new QuotaState()); 221 } 222 return cache.get(regionServer).getGlobalLimiter(); 223 } 224 225 protected boolean isExceedThrottleQuotaEnabled() { 226 return exceedThrottleQuotaEnabled; 227 } 228 229 /** 230 * Applies a request attribute user override if available, otherwise returns the UGI's short 231 * username 232 * @param ugi The request's UserGroupInformation 233 */ 234 String getQuotaUserName(final UserGroupInformation ugi) { 235 if (userOverrideRequestAttributeKey == null) { 236 return ugi.getShortUserName(); 237 } 238 239 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 240 if (!rpcCall.isPresent()) { 241 return ugi.getShortUserName(); 242 } 243 244 byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); 245 if (override == null) { 246 return ugi.getShortUserName(); 247 } 248 return Bytes.toString(override); 249 } 250 251 void triggerCacheRefresh() { 252 refreshChore.triggerNow(); 253 } 254 255 void forceSynchronousCacheRefresh() { 256 refreshChore.chore(); 257 } 258 259 /** visible for testing */ 260 Map<String, QuotaState> getNamespaceQuotaCache() { 261 return namespaceQuotaCache; 262 } 263 264 /** visible for testing */ 265 Map<String, QuotaState> getRegionServerQuotaCache() { 266 return regionServerQuotaCache; 267 } 268 269 /** visible for testing */ 270 Map<TableName, QuotaState> getTableQuotaCache() { 271 return tableQuotaCache; 272 } 273 274 /** visible for testing */ 275 Map<String, UserQuotaState> getUserQuotaCache() { 276 return userQuotaCache; 277 } 278 279 // TODO: Remove this once we have the notification bus 280 private class QuotaRefresherChore extends ScheduledChore { 281 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. 282 // So we cache the results to reduce that load. 283 private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; 284 private final RefreshableExpiringValueCache<Integer> regionServersSize; 285 286 public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { 287 super("QuotaRefresherChore", stoppable, period); 288 289 Duration tableRegionStatesCacheTtl = 290 Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); 291 this.tableRegionStatesClusterMetrics = 292 new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", 293 tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() 294 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); 295 296 Duration regionServersSizeCacheTtl = 297 Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); 298 regionServersSize = 299 new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, 300 () -> rsServices.getConnection().getAdmin().getRegionServers().size()); 301 } 302 303 @Override 304 public synchronized boolean triggerNow() { 305 tableRegionStatesClusterMetrics.invalidate(); 306 regionServersSize.invalidate(); 307 return super.triggerNow(); 308 } 309 310 @Override 311 protected void chore() { 312 synchronized (this) { 313 LOG.info("Reloading quota cache from hbase:quota table"); 314 updateQuotaFactors(); 315 316 try { 317 Map<String, UserQuotaState> newUserQuotaCache = 318 new ConcurrentHashMap<>(fetchUserQuotaStateEntries()); 319 updateNewCacheFromOld(userQuotaCache, newUserQuotaCache); 320 userQuotaCache = newUserQuotaCache; 321 } catch (IOException e) { 322 LOG.error("Error while fetching user quotas", e); 323 } 324 325 try { 326 Map<String, QuotaState> newRegionServerQuotaCache = 327 new ConcurrentHashMap<>(fetchRegionServerQuotaStateEntries()); 328 updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache); 329 regionServerQuotaCache = newRegionServerQuotaCache; 330 } catch (IOException e) { 331 LOG.error("Error while fetching region server quotas", e); 332 } 333 334 try { 335 Map<TableName, QuotaState> newTableQuotaCache = 336 new ConcurrentHashMap<>(fetchTableQuotaStateEntries()); 337 updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache); 338 tableQuotaCache = newTableQuotaCache; 339 } catch (IOException e) { 340 LOG.error("Error while refreshing table quotas", e); 341 } 342 343 try { 344 Map<String, QuotaState> newNamespaceQuotaCache = 345 new ConcurrentHashMap<>(fetchNamespaceQuotaStateEntries()); 346 updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache); 347 namespaceQuotaCache = newNamespaceQuotaCache; 348 } catch (IOException e) { 349 LOG.error("Error while refreshing namespace quotas", e); 350 } 351 352 fetchExceedThrottleQuota(); 353 } 354 } 355 356 private void fetchExceedThrottleQuota() { 357 try { 358 QuotaCache.this.exceedThrottleQuotaEnabled = 359 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 360 } catch (IOException e) { 361 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 362 } 363 } 364 365 /** 366 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 367 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 368 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 369 */ 370 private void updateQuotaFactors() { 371 boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() 372 || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); 373 if (hasTableQuotas) { 374 updateTableMachineQuotaFactors(); 375 } else { 376 updateOnlyMachineQuotaFactors(); 377 } 378 } 379 380 /** 381 * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if 382 * we don't have any table quotas in the cache. 383 */ 384 private void updateOnlyMachineQuotaFactors() { 385 Optional<Integer> rsSize = regionServersSize.get(); 386 if (rsSize.isPresent()) { 387 updateMachineQuotaFactors(rsSize.get()); 388 } else { 389 regionServersSize.refresh(); 390 } 391 } 392 393 /** 394 * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine 395 * factors as well. This relies on a more expensive query for ClusterMetrics. 396 */ 397 private void updateTableMachineQuotaFactors() { 398 Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); 399 if (!clusterMetricsMaybe.isPresent()) { 400 tableRegionStatesClusterMetrics.refresh(); 401 return; 402 } 403 ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); 404 updateMachineQuotaFactors(clusterMetrics.getServersName().size()); 405 406 Map<TableName, RegionStatesCount> tableRegionStatesCount = 407 clusterMetrics.getTableRegionStatesCount(); 408 409 // Update table machine quota factors 410 for (TableName tableName : tableQuotaCache.keySet()) { 411 if (tableRegionStatesCount.containsKey(tableName)) { 412 double factor = 1; 413 try { 414 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 415 if (regionSize == 0) { 416 factor = 0; 417 } else { 418 int localRegionSize = rsServices.getRegions(tableName).size(); 419 factor = 1.0 * localRegionSize / regionSize; 420 } 421 } catch (IOException e) { 422 LOG.warn("Get table regions failed: {}", tableName, e); 423 } 424 tableMachineQuotaFactors.put(tableName, factor); 425 } else { 426 // TableName might have already been dropped (outdated) 427 tableMachineQuotaFactors.remove(tableName); 428 } 429 } 430 } 431 432 private void updateMachineQuotaFactors(int rsSize) { 433 if (rsSize != 0) { 434 // TODO if use rs group, the cluster limit should be shared by the rs group 435 machineQuotaFactor = 1.0 / rsSize; 436 } 437 } 438 } 439 440 /** visible for testing */ 441 static <K, V extends QuotaState> void updateNewCacheFromOld(Map<K, V> oldCache, 442 Map<K, V> newCache) { 443 for (Map.Entry<K, V> entry : oldCache.entrySet()) { 444 K key = entry.getKey(); 445 if (newCache.containsKey(key)) { 446 V newState = newCache.get(key); 447 V oldState = entry.getValue(); 448 oldState.update(newState); 449 newCache.put(key, oldState); 450 } 451 } 452 } 453 454 static class RefreshableExpiringValueCache<T> { 455 private final String name; 456 private final LoadingCache<String, Optional<T>> cache; 457 458 RefreshableExpiringValueCache(String name, Duration refreshPeriod, 459 ThrowingSupplier<T> supplier) { 460 this.name = name; 461 this.cache = 462 CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) 463 .build(new CacheLoader<>() { 464 @Override 465 public Optional<T> load(String key) { 466 try { 467 return Optional.of(supplier.get()); 468 } catch (Exception e) { 469 LOG.warn("Failed to refresh cache {}", name, e); 470 return Optional.empty(); 471 } 472 } 473 }); 474 } 475 476 Optional<T> get() { 477 return cache.getUnchecked(name); 478 } 479 480 void refresh() { 481 cache.refresh(name); 482 } 483 484 void invalidate() { 485 cache.invalidate(name); 486 } 487 } 488 489 @FunctionalInterface 490 static interface ThrowingSupplier<T> { 491 T get() throws Exception; 492 } 493 494}