001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ClusterMetrics; 035import org.apache.hadoop.hbase.ClusterMetrics.Option; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.RegionStatesCount; 041import org.apache.hadoop.hbase.ipc.RpcCall; 042import org.apache.hadoop.hbase.ipc.RpcServer; 043import org.apache.hadoop.hbase.regionserver.HRegionServer; 044import org.apache.hadoop.hbase.regionserver.RegionServerServices; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.security.UserGroupInformation; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 056 057/** 058 * Cache that keeps track of the quota settings for the users and tables that are interacting with 059 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will 060 * be returned and the request to fetch the quota information will be enqueued for the next refresh. 061 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss 062 * events. Later the Quotas will be pushed using the notification system. 063 */ 064@InterfaceAudience.Private 065@InterfaceStability.Evolving 066public class QuotaCache implements Stoppable { 067 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 068 069 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 070 public static final String TABLE_REGION_STATES_CACHE_TTL_MS = 071 "hbase.quota.cache.ttl.region.states.ms"; 072 public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = 073 "hbase.quota.cache.ttl.servers.size.ms"; 074 075 // defines the request attribute key which, when provided, will override the request's username 076 // from the perspective of user quotas 077 public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = 078 "hbase.quota.user.override.key"; 079 private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours 080 private static final int EVICT_PERIOD_FACTOR = 5; 081 082 // for testing purpose only, enforce the cache to be always refreshed 083 static boolean TEST_FORCE_REFRESH = false; 084 // for testing purpose only, block cache refreshes to reliably verify state 085 static boolean TEST_BLOCK_REFRESH = false; 086 087 private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 088 private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 089 private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 090 private final ConcurrentMap<String, QuotaState> regionServerQuotaCache = 091 new ConcurrentHashMap<>(); 092 private volatile boolean exceedThrottleQuotaEnabled = false; 093 // factors used to divide cluster scope quota into machine scope quota 094 private volatile double machineQuotaFactor = 1; 095 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 096 new ConcurrentHashMap<>(); 097 private final RegionServerServices rsServices; 098 private final String userOverrideRequestAttributeKey; 099 100 private QuotaRefresherChore refreshChore; 101 private boolean stopped = true; 102 103 public QuotaCache(final RegionServerServices rsServices) { 104 this.rsServices = rsServices; 105 this.userOverrideRequestAttributeKey = 106 rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); 107 } 108 109 public void start() throws IOException { 110 stopped = false; 111 112 Configuration conf = rsServices.getConfiguration(); 113 // Refresh the cache every 12 hours, and every time a quota is changed, and every time a 114 // configuration 115 // reload is triggered. Periodic reloads are kept to a minimum to avoid flooding the 116 // RegionServer 117 // holding the hbase:quota table with requests. 118 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 119 refreshChore = new QuotaRefresherChore(conf, period, this); 120 rsServices.getChoreService().scheduleChore(refreshChore); 121 } 122 123 @Override 124 public void stop(final String why) { 125 if (refreshChore != null) { 126 LOG.debug("Stopping QuotaRefresherChore chore."); 127 refreshChore.shutdown(true); 128 } 129 stopped = true; 130 } 131 132 @Override 133 public boolean isStopped() { 134 return stopped; 135 } 136 137 /** 138 * Returns the limiter associated to the specified user/table. 139 * @param ugi the user to limit 140 * @param table the table to limit 141 * @return the limiter associated to the specified user/table 142 */ 143 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 144 if (table.isSystemTable()) { 145 return NoopQuotaLimiter.get(); 146 } 147 return getUserQuotaState(ugi).getTableLimiter(table); 148 } 149 150 /** 151 * Returns the QuotaState associated to the specified user. 152 * @param ugi the user 153 * @return the quota info associated to specified user 154 */ 155 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 156 return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), 157 () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); 158 } 159 160 /** 161 * Returns the limiter associated to the specified table. 162 * @param table the table to limit 163 * @return the limiter associated to the specified table 164 */ 165 public QuotaLimiter getTableLimiter(final TableName table) { 166 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 167 } 168 169 /** 170 * Returns the limiter associated to the specified namespace. 171 * @param namespace the namespace to limit 172 * @return the limiter associated to the specified namespace 173 */ 174 public QuotaLimiter getNamespaceLimiter(final String namespace) { 175 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 176 } 177 178 /** 179 * Returns the limiter associated to the specified region server. 180 * @param regionServer the region server to limit 181 * @return the limiter associated to the specified region server 182 */ 183 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 184 return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter(); 185 } 186 187 protected boolean isExceedThrottleQuotaEnabled() { 188 return exceedThrottleQuotaEnabled; 189 } 190 191 /** 192 * Applies a request attribute user override if available, otherwise returns the UGI's short 193 * username 194 * @param ugi The request's UserGroupInformation 195 */ 196 private String getQuotaUserName(final UserGroupInformation ugi) { 197 if (userOverrideRequestAttributeKey == null) { 198 return ugi.getShortUserName(); 199 } 200 201 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 202 if (!rpcCall.isPresent()) { 203 return ugi.getShortUserName(); 204 } 205 206 byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); 207 if (override == null) { 208 return ugi.getShortUserName(); 209 } 210 return Bytes.toString(override); 211 } 212 213 /** 214 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 215 * returned and the quota request will be enqueued for the next cache refresh. 216 */ 217 private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) { 218 return computeIfAbsent(quotasMap, key, QuotaState::new); 219 } 220 221 void triggerCacheRefresh() { 222 refreshChore.triggerNow(); 223 } 224 225 void forceSynchronousCacheRefresh() { 226 refreshChore.chore(); 227 } 228 229 long getLastUpdate() { 230 return refreshChore.lastUpdate; 231 } 232 233 Map<String, QuotaState> getNamespaceQuotaCache() { 234 return namespaceQuotaCache; 235 } 236 237 Map<String, QuotaState> getRegionServerQuotaCache() { 238 return regionServerQuotaCache; 239 } 240 241 Map<TableName, QuotaState> getTableQuotaCache() { 242 return tableQuotaCache; 243 } 244 245 Map<String, UserQuotaState> getUserQuotaCache() { 246 return userQuotaCache; 247 } 248 249 // TODO: Remove this once we have the notification bus 250 private class QuotaRefresherChore extends ScheduledChore { 251 private long lastUpdate = 0; 252 253 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. 254 // So we cache the results to reduce that load. 255 private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; 256 private final RefreshableExpiringValueCache<Integer> regionServersSize; 257 258 public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { 259 super("QuotaRefresherChore", stoppable, period); 260 261 Duration tableRegionStatesCacheTtl = 262 Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); 263 this.tableRegionStatesClusterMetrics = 264 new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", 265 tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() 266 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); 267 268 Duration regionServersSizeCacheTtl = 269 Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); 270 regionServersSize = 271 new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, 272 () -> rsServices.getConnection().getAdmin().getRegionServers().size()); 273 } 274 275 @Override 276 public synchronized boolean triggerNow() { 277 tableRegionStatesClusterMetrics.invalidate(); 278 regionServersSize.invalidate(); 279 return super.triggerNow(); 280 } 281 282 @Override 283 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", 284 justification = "I do not understand why the complaints, it looks good to me -- FIX") 285 protected void chore() { 286 while (TEST_BLOCK_REFRESH) { 287 LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false"); 288 try { 289 Thread.sleep(10); 290 } catch (InterruptedException e) { 291 throw new RuntimeException(e); 292 } 293 } 294 // Prefetch online tables/namespaces 295 for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) { 296 if (table.isSystemTable()) { 297 continue; 298 } 299 QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState()); 300 301 final String ns = table.getNamespaceAsString(); 302 303 QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState()); 304 } 305 306 QuotaCache.this.regionServerQuotaCache 307 .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); 308 309 updateQuotaFactors(); 310 fetchNamespaceQuotaState(); 311 fetchTableQuotaState(); 312 fetchUserQuotaState(); 313 fetchRegionServerQuotaState(); 314 fetchExceedThrottleQuota(); 315 lastUpdate = EnvironmentEdgeManager.currentTime(); 316 } 317 318 private void fetchNamespaceQuotaState() { 319 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 320 @Override 321 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 322 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 323 } 324 325 @Override 326 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 327 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, 328 machineQuotaFactor); 329 } 330 }); 331 } 332 333 private void fetchTableQuotaState() { 334 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 335 @Override 336 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 337 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 338 } 339 340 @Override 341 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 342 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, 343 tableMachineQuotaFactors); 344 } 345 }); 346 } 347 348 private void fetchUserQuotaState() { 349 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 350 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 351 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 352 @Override 353 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 354 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 355 } 356 357 @Override 358 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { 359 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, 360 tableMachineQuotaFactors, machineQuotaFactor); 361 } 362 }); 363 } 364 365 private void fetchRegionServerQuotaState() { 366 fetch("regionServer", QuotaCache.this.regionServerQuotaCache, 367 new Fetcher<String, QuotaState>() { 368 @Override 369 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 370 return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey()); 371 } 372 373 @Override 374 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 375 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 376 } 377 }); 378 } 379 380 private void fetchExceedThrottleQuota() { 381 try { 382 QuotaCache.this.exceedThrottleQuotaEnabled = 383 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 384 } catch (IOException e) { 385 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 386 } 387 } 388 389 private <K, V extends QuotaState> void fetch(final String type, 390 final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 391 long now = EnvironmentEdgeManager.currentTime(); 392 long evictPeriod = getPeriod() * EVICT_PERIOD_FACTOR; 393 // Find the quota entries to update 394 List<Get> gets = new ArrayList<>(); 395 List<K> toRemove = new ArrayList<>(); 396 for (Map.Entry<K, V> entry : quotasMap.entrySet()) { 397 long lastQuery = entry.getValue().getLastQuery(); 398 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 399 toRemove.add(entry.getKey()); 400 } else { 401 gets.add(fetcher.makeGet(entry)); 402 } 403 } 404 405 for (final K key : toRemove) { 406 if (LOG.isTraceEnabled()) { 407 LOG.trace("evict " + type + " key=" + key); 408 } 409 quotasMap.remove(key); 410 } 411 412 // fetch and update the quota entries 413 if (!gets.isEmpty()) { 414 try { 415 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { 416 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 417 if (quotaInfo != null) { 418 quotaInfo.update(entry.getValue()); 419 } 420 421 if (LOG.isTraceEnabled()) { 422 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 423 } 424 } 425 } catch (IOException e) { 426 LOG.warn("Unable to read " + type + " from quota table", e); 427 } 428 } 429 } 430 431 /** 432 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 433 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 434 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 435 */ 436 private void updateQuotaFactors() { 437 boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() 438 || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); 439 if (hasTableQuotas) { 440 updateTableMachineQuotaFactors(); 441 } else { 442 updateOnlyMachineQuotaFactors(); 443 } 444 } 445 446 /** 447 * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if 448 * we don't have any table quotas in the cache. 449 */ 450 private void updateOnlyMachineQuotaFactors() { 451 Optional<Integer> rsSize = regionServersSize.get(); 452 if (rsSize.isPresent()) { 453 updateMachineQuotaFactors(rsSize.get()); 454 } else { 455 regionServersSize.refresh(); 456 } 457 } 458 459 /** 460 * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine 461 * factors as well. This relies on a more expensive query for ClusterMetrics. 462 */ 463 private void updateTableMachineQuotaFactors() { 464 Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); 465 if (!clusterMetricsMaybe.isPresent()) { 466 tableRegionStatesClusterMetrics.refresh(); 467 return; 468 } 469 ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); 470 updateMachineQuotaFactors(clusterMetrics.getServersName().size()); 471 472 Map<TableName, RegionStatesCount> tableRegionStatesCount = 473 clusterMetrics.getTableRegionStatesCount(); 474 475 // Update table machine quota factors 476 for (TableName tableName : tableQuotaCache.keySet()) { 477 if (tableRegionStatesCount.containsKey(tableName)) { 478 double factor = 1; 479 try { 480 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 481 if (regionSize == 0) { 482 factor = 0; 483 } else { 484 int localRegionSize = rsServices.getRegions(tableName).size(); 485 factor = 1.0 * localRegionSize / regionSize; 486 } 487 } catch (IOException e) { 488 LOG.warn("Get table regions failed: {}", tableName, e); 489 } 490 tableMachineQuotaFactors.put(tableName, factor); 491 } else { 492 // TableName might have already been dropped (outdated) 493 tableMachineQuotaFactors.remove(tableName); 494 } 495 } 496 } 497 498 private void updateMachineQuotaFactors(int rsSize) { 499 if (rsSize != 0) { 500 // TODO if use rs group, the cluster limit should be shared by the rs group 501 machineQuotaFactor = 1.0 / rsSize; 502 } 503 } 504 } 505 506 static class RefreshableExpiringValueCache<T> { 507 private final String name; 508 private final LoadingCache<String, Optional<T>> cache; 509 510 RefreshableExpiringValueCache(String name, Duration refreshPeriod, 511 ThrowingSupplier<T> supplier) { 512 this.name = name; 513 this.cache = 514 CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) 515 .build(new CacheLoader<>() { 516 @Override 517 public Optional<T> load(String key) { 518 try { 519 return Optional.of(supplier.get()); 520 } catch (Exception e) { 521 LOG.warn("Failed to refresh cache {}", name, e); 522 return Optional.empty(); 523 } 524 } 525 }); 526 } 527 528 Optional<T> get() { 529 return cache.getUnchecked(name); 530 } 531 532 void refresh() { 533 cache.refresh(name); 534 } 535 536 void invalidate() { 537 cache.invalidate(name); 538 } 539 } 540 541 @FunctionalInterface 542 static interface ThrowingSupplier<T> { 543 T get() throws Exception; 544 } 545 546 static interface Fetcher<Key, Value> { 547 Get makeGet(Map.Entry<Key, Value> entry); 548 549 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 550 } 551}