001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.EnumSet; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Optional; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ClusterMetrics; 030import org.apache.hadoop.hbase.ClusterMetrics.Option; 031import org.apache.hadoop.hbase.ScheduledChore; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.RegionStatesCount; 035import org.apache.hadoop.hbase.ipc.RpcCall; 036import org.apache.hadoop.hbase.ipc.RpcServer; 037import org.apache.hadoop.hbase.regionserver.RegionServerServices; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.security.UserGroupInformation; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 046import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 047import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 048 049/** 050 * Cache that keeps track of the quota settings for the users and tables that are interacting with 051 * it. 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055public class QuotaCache implements Stoppable { 056 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 057 058 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 059 public static final String TABLE_REGION_STATES_CACHE_TTL_MS = 060 "hbase.quota.cache.ttl.region.states.ms"; 061 public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = 062 "hbase.quota.cache.ttl.servers.size.ms"; 063 064 // defines the request attribute key which, when provided, will override the request's username 065 // from the perspective of user quotas 066 public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = 067 "hbase.quota.user.override.key"; 068 private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours 069 070 private final Object initializerLock = new Object(); 071 private volatile boolean initialized = false; 072 073 private volatile Map<String, QuotaState> namespaceQuotaCache = new HashMap<>(); 074 private volatile Map<TableName, QuotaState> tableQuotaCache = new HashMap<>(); 075 private volatile Map<String, UserQuotaState> userQuotaCache = new HashMap<>(); 076 private volatile Map<String, QuotaState> regionServerQuotaCache = new HashMap<>(); 077 078 private volatile boolean exceedThrottleQuotaEnabled = false; 079 // factors used to divide cluster scope quota into machine scope quota 080 private volatile double machineQuotaFactor = 1; 081 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 082 new ConcurrentHashMap<>(); 083 private final RegionServerServices rsServices; 084 private final String userOverrideRequestAttributeKey; 085 086 private QuotaRefresherChore refreshChore; 087 private boolean stopped = true; 088 089 public QuotaCache(final RegionServerServices rsServices) { 090 this.rsServices = rsServices; 091 this.userOverrideRequestAttributeKey = 092 rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); 093 } 094 095 public void start() throws IOException { 096 stopped = false; 097 098 Configuration conf = rsServices.getConfiguration(); 099 // Refresh the cache every 12 hours, and every time a quota is changed, and every time a 100 // configuration reload is triggered. Periodic reloads are kept to a minimum to avoid 101 // flooding the RegionServer holding the hbase:quota table with requests. 102 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 103 refreshChore = new QuotaRefresherChore(conf, period, this); 104 rsServices.getChoreService().scheduleChore(refreshChore); 105 } 106 107 @Override 108 public void stop(final String why) { 109 if (refreshChore != null) { 110 LOG.debug("Stopping QuotaRefresherChore chore."); 111 refreshChore.shutdown(true); 112 } 113 stopped = true; 114 } 115 116 @Override 117 public boolean isStopped() { 118 return stopped; 119 } 120 121 private void ensureInitialized() { 122 if (!initialized) { 123 synchronized (initializerLock) { 124 if (!initialized) { 125 refreshChore.chore(); 126 initialized = true; 127 } 128 } 129 } 130 } 131 132 private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException { 133 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), tableMachineQuotaFactors, 134 machineQuotaFactor); 135 } 136 137 private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException { 138 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection()); 139 } 140 141 private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException { 142 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), tableMachineQuotaFactors); 143 } 144 145 private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException { 146 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), machineQuotaFactor); 147 } 148 149 /** 150 * Returns the limiter associated to the specified user/table. 151 * @param ugi the user to limit 152 * @param table the table to limit 153 * @return the limiter associated to the specified user/table 154 */ 155 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 156 if (table.isSystemTable()) { 157 return NoopQuotaLimiter.get(); 158 } 159 return getUserQuotaState(ugi).getTableLimiter(table); 160 } 161 162 /** 163 * Returns the QuotaState associated to the specified user. 164 * @param ugi the user 165 * @return the quota info associated to specified user 166 */ 167 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 168 String user = getQuotaUserName(ugi); 169 ensureInitialized(); 170 // local reference because the chore thread may assign to userQuotaCache 171 Map<String, UserQuotaState> cache = userQuotaCache; 172 if (!cache.containsKey(user)) { 173 cache.put(user, QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration())); 174 } 175 return cache.get(user); 176 } 177 178 /** 179 * Returns the limiter associated to the specified table. 180 * @param table the table to limit 181 * @return the limiter associated to the specified table 182 */ 183 public QuotaLimiter getTableLimiter(final TableName table) { 184 ensureInitialized(); 185 // local reference because the chore thread may assign to tableQuotaCache 186 Map<TableName, QuotaState> cache = tableQuotaCache; 187 if (!cache.containsKey(table)) { 188 cache.put(table, new QuotaState()); 189 } 190 return cache.get(table).getGlobalLimiter(); 191 } 192 193 /** 194 * Returns the limiter associated to the specified namespace. 195 * @param namespace the namespace to limit 196 * @return the limiter associated to the specified namespace 197 */ 198 public QuotaLimiter getNamespaceLimiter(final String namespace) { 199 ensureInitialized(); 200 // local reference because the chore thread may assign to namespaceQuotaCache 201 Map<String, QuotaState> cache = namespaceQuotaCache; 202 if (!cache.containsKey(namespace)) { 203 cache.put(namespace, new QuotaState()); 204 } 205 return cache.get(namespace).getGlobalLimiter(); 206 } 207 208 /** 209 * Returns the limiter associated to the specified region server. 210 * @param regionServer the region server to limit 211 * @return the limiter associated to the specified region server 212 */ 213 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 214 ensureInitialized(); 215 // local reference because the chore thread may assign to regionServerQuotaCache 216 Map<String, QuotaState> cache = regionServerQuotaCache; 217 if (!cache.containsKey(regionServer)) { 218 cache.put(regionServer, new QuotaState()); 219 } 220 return cache.get(regionServer).getGlobalLimiter(); 221 } 222 223 protected boolean isExceedThrottleQuotaEnabled() { 224 return exceedThrottleQuotaEnabled; 225 } 226 227 /** 228 * Applies a request attribute user override if available, otherwise returns the UGI's short 229 * username 230 * @param ugi The request's UserGroupInformation 231 */ 232 private String getQuotaUserName(final UserGroupInformation ugi) { 233 if (userOverrideRequestAttributeKey == null) { 234 return ugi.getShortUserName(); 235 } 236 237 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 238 if (!rpcCall.isPresent()) { 239 return ugi.getShortUserName(); 240 } 241 242 byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); 243 if (override == null) { 244 return ugi.getShortUserName(); 245 } 246 return Bytes.toString(override); 247 } 248 249 void triggerCacheRefresh() { 250 refreshChore.triggerNow(); 251 } 252 253 void forceSynchronousCacheRefresh() { 254 refreshChore.chore(); 255 } 256 257 /** visible for testing */ 258 Map<String, QuotaState> getNamespaceQuotaCache() { 259 return namespaceQuotaCache; 260 } 261 262 /** visible for testing */ 263 Map<String, QuotaState> getRegionServerQuotaCache() { 264 return regionServerQuotaCache; 265 } 266 267 /** visible for testing */ 268 Map<TableName, QuotaState> getTableQuotaCache() { 269 return tableQuotaCache; 270 } 271 272 /** visible for testing */ 273 Map<String, UserQuotaState> getUserQuotaCache() { 274 return userQuotaCache; 275 } 276 277 // TODO: Remove this once we have the notification bus 278 private class QuotaRefresherChore extends ScheduledChore { 279 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. 280 // So we cache the results to reduce that load. 281 private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; 282 private final RefreshableExpiringValueCache<Integer> regionServersSize; 283 284 public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { 285 super("QuotaRefresherChore", stoppable, period); 286 287 Duration tableRegionStatesCacheTtl = 288 Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); 289 this.tableRegionStatesClusterMetrics = 290 new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", 291 tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() 292 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); 293 294 Duration regionServersSizeCacheTtl = 295 Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); 296 regionServersSize = 297 new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, 298 () -> rsServices.getConnection().getAdmin().getRegionServers().size()); 299 } 300 301 @Override 302 public synchronized boolean triggerNow() { 303 tableRegionStatesClusterMetrics.invalidate(); 304 regionServersSize.invalidate(); 305 return super.triggerNow(); 306 } 307 308 @Override 309 protected void chore() { 310 updateQuotaFactors(); 311 312 try { 313 Map<String, UserQuotaState> newUserQuotaCache = new HashMap<>(fetchUserQuotaStateEntries()); 314 updateNewCacheFromOld(userQuotaCache, newUserQuotaCache); 315 userQuotaCache = newUserQuotaCache; 316 } catch (IOException e) { 317 LOG.error("Error while fetching user quotas", e); 318 } 319 320 try { 321 Map<String, QuotaState> newRegionServerQuotaCache = 322 new HashMap<>(fetchRegionServerQuotaStateEntries()); 323 updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache); 324 regionServerQuotaCache = newRegionServerQuotaCache; 325 } catch (IOException e) { 326 LOG.error("Error while fetching region server quotas", e); 327 } 328 329 try { 330 Map<TableName, QuotaState> newTableQuotaCache = 331 new HashMap<>(fetchTableQuotaStateEntries()); 332 updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache); 333 tableQuotaCache = newTableQuotaCache; 334 } catch (IOException e) { 335 LOG.error("Error while refreshing table quotas", e); 336 } 337 338 try { 339 Map<String, QuotaState> newNamespaceQuotaCache = 340 new HashMap<>(fetchNamespaceQuotaStateEntries()); 341 updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache); 342 namespaceQuotaCache = newNamespaceQuotaCache; 343 } catch (IOException e) { 344 LOG.error("Error while refreshing namespace quotas", e); 345 } 346 347 fetchExceedThrottleQuota(); 348 } 349 350 private void fetchExceedThrottleQuota() { 351 try { 352 QuotaCache.this.exceedThrottleQuotaEnabled = 353 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 354 } catch (IOException e) { 355 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 356 } 357 } 358 359 /** 360 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 361 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 362 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 363 */ 364 private void updateQuotaFactors() { 365 boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() 366 || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); 367 if (hasTableQuotas) { 368 updateTableMachineQuotaFactors(); 369 } else { 370 updateOnlyMachineQuotaFactors(); 371 } 372 } 373 374 /** 375 * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if 376 * we don't have any table quotas in the cache. 377 */ 378 private void updateOnlyMachineQuotaFactors() { 379 Optional<Integer> rsSize = regionServersSize.get(); 380 if (rsSize.isPresent()) { 381 updateMachineQuotaFactors(rsSize.get()); 382 } else { 383 regionServersSize.refresh(); 384 } 385 } 386 387 /** 388 * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine 389 * factors as well. This relies on a more expensive query for ClusterMetrics. 390 */ 391 private void updateTableMachineQuotaFactors() { 392 Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); 393 if (!clusterMetricsMaybe.isPresent()) { 394 tableRegionStatesClusterMetrics.refresh(); 395 return; 396 } 397 ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); 398 updateMachineQuotaFactors(clusterMetrics.getServersName().size()); 399 400 Map<TableName, RegionStatesCount> tableRegionStatesCount = 401 clusterMetrics.getTableRegionStatesCount(); 402 403 // Update table machine quota factors 404 for (TableName tableName : tableQuotaCache.keySet()) { 405 if (tableRegionStatesCount.containsKey(tableName)) { 406 double factor = 1; 407 try { 408 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 409 if (regionSize == 0) { 410 factor = 0; 411 } else { 412 int localRegionSize = rsServices.getRegions(tableName).size(); 413 factor = 1.0 * localRegionSize / regionSize; 414 } 415 } catch (IOException e) { 416 LOG.warn("Get table regions failed: {}", tableName, e); 417 } 418 tableMachineQuotaFactors.put(tableName, factor); 419 } else { 420 // TableName might have already been dropped (outdated) 421 tableMachineQuotaFactors.remove(tableName); 422 } 423 } 424 } 425 426 private void updateMachineQuotaFactors(int rsSize) { 427 if (rsSize != 0) { 428 // TODO if use rs group, the cluster limit should be shared by the rs group 429 machineQuotaFactor = 1.0 / rsSize; 430 } 431 } 432 } 433 434 /** visible for testing */ 435 static <K, V extends QuotaState> void updateNewCacheFromOld(Map<K, V> oldCache, 436 Map<K, V> newCache) { 437 for (Map.Entry<K, V> entry : oldCache.entrySet()) { 438 K key = entry.getKey(); 439 if (newCache.containsKey(key)) { 440 V newState = newCache.get(key); 441 V oldState = entry.getValue(); 442 oldState.update(newState); 443 newCache.put(key, oldState); 444 } 445 } 446 } 447 448 static class RefreshableExpiringValueCache<T> { 449 private final String name; 450 private final LoadingCache<String, Optional<T>> cache; 451 452 RefreshableExpiringValueCache(String name, Duration refreshPeriod, 453 ThrowingSupplier<T> supplier) { 454 this.name = name; 455 this.cache = 456 CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) 457 .build(new CacheLoader<>() { 458 @Override 459 public Optional<T> load(String key) { 460 try { 461 return Optional.of(supplier.get()); 462 } catch (Exception e) { 463 LOG.warn("Failed to refresh cache {}", name, e); 464 return Optional.empty(); 465 } 466 } 467 }); 468 } 469 470 Optional<T> get() { 471 return cache.getUnchecked(name); 472 } 473 474 void refresh() { 475 cache.refresh(name); 476 } 477 478 void invalidate() { 479 cache.invalidate(name); 480 } 481 } 482 483 @FunctionalInterface 484 static interface ThrowingSupplier<T> { 485 T get() throws Exception; 486 } 487 488}