001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ClusterMetrics; 035import org.apache.hadoop.hbase.ClusterMetrics.Option; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.RegionStatesCount; 041import org.apache.hadoop.hbase.ipc.RpcCall; 042import org.apache.hadoop.hbase.ipc.RpcServer; 043import org.apache.hadoop.hbase.regionserver.HRegionServer; 044import org.apache.hadoop.hbase.regionserver.RegionServerServices; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.security.UserGroupInformation; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 056 057/** 058 * Cache that keeps track of the quota settings for the users and tables that are interacting with 059 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will 060 * be returned and the request to fetch the quota information will be enqueued for the next refresh. 061 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss 062 * events. Later the Quotas will be pushed using the notification system. 063 */ 064@InterfaceAudience.Private 065@InterfaceStability.Evolving 066public class QuotaCache implements Stoppable { 067 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 068 069 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 070 public static final String TABLE_REGION_STATES_CACHE_TTL_MS = 071 "hbase.quota.cache.ttl.region.states.ms"; 072 public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = 073 "hbase.quota.cache.ttl.servers.size.ms"; 074 075 // defines the request attribute key which, when provided, will override the request's username 076 // from the perspective of user quotas 077 public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = 078 "hbase.quota.user.override.key"; 079 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min 080 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD 081 082 // for testing purpose only, enforce the cache to be always refreshed 083 static boolean TEST_FORCE_REFRESH = false; 084 // for testing purpose only, block cache refreshes to reliably verify state 085 static boolean TEST_BLOCK_REFRESH = false; 086 087 private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 088 private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 089 private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 090 private final ConcurrentMap<String, QuotaState> regionServerQuotaCache = 091 new ConcurrentHashMap<>(); 092 private volatile boolean exceedThrottleQuotaEnabled = false; 093 // factors used to divide cluster scope quota into machine scope quota 094 private volatile double machineQuotaFactor = 1; 095 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 096 new ConcurrentHashMap<>(); 097 private final RegionServerServices rsServices; 098 private final String userOverrideRequestAttributeKey; 099 100 private QuotaRefresherChore refreshChore; 101 private boolean stopped = true; 102 103 public QuotaCache(final RegionServerServices rsServices) { 104 this.rsServices = rsServices; 105 this.userOverrideRequestAttributeKey = 106 rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); 107 } 108 109 public void start() throws IOException { 110 stopped = false; 111 112 // TODO: This will be replaced once we have the notification bus ready. 113 Configuration conf = rsServices.getConfiguration(); 114 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 115 refreshChore = new QuotaRefresherChore(conf, period, this); 116 rsServices.getChoreService().scheduleChore(refreshChore); 117 } 118 119 @Override 120 public void stop(final String why) { 121 if (refreshChore != null) { 122 LOG.debug("Stopping QuotaRefresherChore chore."); 123 refreshChore.shutdown(true); 124 } 125 stopped = true; 126 } 127 128 @Override 129 public boolean isStopped() { 130 return stopped; 131 } 132 133 /** 134 * Returns the limiter associated to the specified user/table. 135 * @param ugi the user to limit 136 * @param table the table to limit 137 * @return the limiter associated to the specified user/table 138 */ 139 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 140 if (table.isSystemTable()) { 141 return NoopQuotaLimiter.get(); 142 } 143 return getUserQuotaState(ugi).getTableLimiter(table); 144 } 145 146 /** 147 * Returns the QuotaState associated to the specified user. 148 * @param ugi the user 149 * @return the quota info associated to specified user 150 */ 151 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 152 return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), 153 () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); 154 } 155 156 /** 157 * Returns the limiter associated to the specified table. 158 * @param table the table to limit 159 * @return the limiter associated to the specified table 160 */ 161 public QuotaLimiter getTableLimiter(final TableName table) { 162 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 163 } 164 165 /** 166 * Returns the limiter associated to the specified namespace. 167 * @param namespace the namespace to limit 168 * @return the limiter associated to the specified namespace 169 */ 170 public QuotaLimiter getNamespaceLimiter(final String namespace) { 171 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 172 } 173 174 /** 175 * Returns the limiter associated to the specified region server. 176 * @param regionServer the region server to limit 177 * @return the limiter associated to the specified region server 178 */ 179 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 180 return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter(); 181 } 182 183 protected boolean isExceedThrottleQuotaEnabled() { 184 return exceedThrottleQuotaEnabled; 185 } 186 187 /** 188 * Applies a request attribute user override if available, otherwise returns the UGI's short 189 * username 190 * @param ugi The request's UserGroupInformation 191 */ 192 private String getQuotaUserName(final UserGroupInformation ugi) { 193 if (userOverrideRequestAttributeKey == null) { 194 return ugi.getShortUserName(); 195 } 196 197 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 198 if (!rpcCall.isPresent()) { 199 return ugi.getShortUserName(); 200 } 201 202 byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); 203 if (override == null) { 204 return ugi.getShortUserName(); 205 } 206 return Bytes.toString(override); 207 } 208 209 /** 210 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 211 * returned and the quota request will be enqueued for the next cache refresh. 212 */ 213 private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) { 214 return computeIfAbsent(quotasMap, key, QuotaState::new); 215 } 216 217 void triggerCacheRefresh() { 218 refreshChore.triggerNow(); 219 } 220 221 long getLastUpdate() { 222 return refreshChore.lastUpdate; 223 } 224 225 Map<String, QuotaState> getNamespaceQuotaCache() { 226 return namespaceQuotaCache; 227 } 228 229 Map<String, QuotaState> getRegionServerQuotaCache() { 230 return regionServerQuotaCache; 231 } 232 233 Map<TableName, QuotaState> getTableQuotaCache() { 234 return tableQuotaCache; 235 } 236 237 Map<String, UserQuotaState> getUserQuotaCache() { 238 return userQuotaCache; 239 } 240 241 // TODO: Remove this once we have the notification bus 242 private class QuotaRefresherChore extends ScheduledChore { 243 private long lastUpdate = 0; 244 245 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. 246 // So we cache the results to reduce that load. 247 private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; 248 private final RefreshableExpiringValueCache<Integer> regionServersSize; 249 250 public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { 251 super("QuotaRefresherChore", stoppable, period); 252 253 Duration tableRegionStatesCacheTtl = 254 Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); 255 this.tableRegionStatesClusterMetrics = 256 new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", 257 tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() 258 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); 259 260 Duration regionServersSizeCacheTtl = 261 Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); 262 regionServersSize = 263 new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, 264 () -> rsServices.getConnection().getAdmin().getRegionServers().size()); 265 } 266 267 @Override 268 public synchronized boolean triggerNow() { 269 tableRegionStatesClusterMetrics.invalidate(); 270 regionServersSize.invalidate(); 271 return super.triggerNow(); 272 } 273 274 @Override 275 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", 276 justification = "I do not understand why the complaints, it looks good to me -- FIX") 277 protected void chore() { 278 while (TEST_BLOCK_REFRESH) { 279 LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false"); 280 try { 281 Thread.sleep(10); 282 } catch (InterruptedException e) { 283 throw new RuntimeException(e); 284 } 285 } 286 // Prefetch online tables/namespaces 287 for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) { 288 if (table.isSystemTable()) { 289 continue; 290 } 291 QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState()); 292 293 final String ns = table.getNamespaceAsString(); 294 295 QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState()); 296 } 297 298 QuotaCache.this.regionServerQuotaCache 299 .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); 300 301 updateQuotaFactors(); 302 fetchNamespaceQuotaState(); 303 fetchTableQuotaState(); 304 fetchUserQuotaState(); 305 fetchRegionServerQuotaState(); 306 fetchExceedThrottleQuota(); 307 lastUpdate = EnvironmentEdgeManager.currentTime(); 308 } 309 310 private void fetchNamespaceQuotaState() { 311 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 312 @Override 313 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 314 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 315 } 316 317 @Override 318 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 319 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, 320 machineQuotaFactor); 321 } 322 }); 323 } 324 325 private void fetchTableQuotaState() { 326 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 327 @Override 328 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 329 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 330 } 331 332 @Override 333 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 334 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, 335 tableMachineQuotaFactors); 336 } 337 }); 338 } 339 340 private void fetchUserQuotaState() { 341 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 342 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 343 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 344 @Override 345 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 346 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 347 } 348 349 @Override 350 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { 351 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, 352 tableMachineQuotaFactors, machineQuotaFactor); 353 } 354 }); 355 } 356 357 private void fetchRegionServerQuotaState() { 358 fetch("regionServer", QuotaCache.this.regionServerQuotaCache, 359 new Fetcher<String, QuotaState>() { 360 @Override 361 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 362 return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey()); 363 } 364 365 @Override 366 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 367 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 368 } 369 }); 370 } 371 372 private void fetchExceedThrottleQuota() { 373 try { 374 QuotaCache.this.exceedThrottleQuotaEnabled = 375 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 376 } catch (IOException e) { 377 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 378 } 379 } 380 381 private <K, V extends QuotaState> void fetch(final String type, 382 final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 383 long now = EnvironmentEdgeManager.currentTime(); 384 long refreshPeriod = getPeriod(); 385 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; 386 387 // Find the quota entries to update 388 List<Get> gets = new ArrayList<>(); 389 List<K> toRemove = new ArrayList<>(); 390 for (Map.Entry<K, V> entry : quotasMap.entrySet()) { 391 long lastUpdate = entry.getValue().getLastUpdate(); 392 long lastQuery = entry.getValue().getLastQuery(); 393 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 394 toRemove.add(entry.getKey()); 395 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) { 396 gets.add(fetcher.makeGet(entry)); 397 } 398 } 399 400 for (final K key : toRemove) { 401 if (LOG.isTraceEnabled()) { 402 LOG.trace("evict " + type + " key=" + key); 403 } 404 quotasMap.remove(key); 405 } 406 407 // fetch and update the quota entries 408 if (!gets.isEmpty()) { 409 try { 410 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { 411 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 412 if (quotaInfo != null) { 413 quotaInfo.update(entry.getValue()); 414 } 415 416 if (LOG.isTraceEnabled()) { 417 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 418 } 419 } 420 } catch (IOException e) { 421 LOG.warn("Unable to read " + type + " from quota table", e); 422 } 423 } 424 } 425 426 /** 427 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 428 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 429 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 430 */ 431 private void updateQuotaFactors() { 432 boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() 433 || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); 434 if (hasTableQuotas) { 435 updateTableMachineQuotaFactors(); 436 } else { 437 updateOnlyMachineQuotaFactors(); 438 } 439 } 440 441 /** 442 * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if 443 * we don't have any table quotas in the cache. 444 */ 445 private void updateOnlyMachineQuotaFactors() { 446 Optional<Integer> rsSize = regionServersSize.get(); 447 if (rsSize.isPresent()) { 448 updateMachineQuotaFactors(rsSize.get()); 449 } else { 450 regionServersSize.refresh(); 451 } 452 } 453 454 /** 455 * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine 456 * factors as well. This relies on a more expensive query for ClusterMetrics. 457 */ 458 private void updateTableMachineQuotaFactors() { 459 Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); 460 if (!clusterMetricsMaybe.isPresent()) { 461 tableRegionStatesClusterMetrics.refresh(); 462 return; 463 } 464 ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); 465 updateMachineQuotaFactors(clusterMetrics.getServersName().size()); 466 467 Map<TableName, RegionStatesCount> tableRegionStatesCount = 468 clusterMetrics.getTableRegionStatesCount(); 469 470 // Update table machine quota factors 471 for (TableName tableName : tableQuotaCache.keySet()) { 472 if (tableRegionStatesCount.containsKey(tableName)) { 473 double factor = 1; 474 try { 475 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 476 if (regionSize == 0) { 477 factor = 0; 478 } else { 479 int localRegionSize = rsServices.getRegions(tableName).size(); 480 factor = 1.0 * localRegionSize / regionSize; 481 } 482 } catch (IOException e) { 483 LOG.warn("Get table regions failed: {}", tableName, e); 484 } 485 tableMachineQuotaFactors.put(tableName, factor); 486 } else { 487 // TableName might have already been dropped (outdated) 488 tableMachineQuotaFactors.remove(tableName); 489 } 490 } 491 } 492 493 private void updateMachineQuotaFactors(int rsSize) { 494 if (rsSize != 0) { 495 // TODO if use rs group, the cluster limit should be shared by the rs group 496 machineQuotaFactor = 1.0 / rsSize; 497 } 498 } 499 } 500 501 static class RefreshableExpiringValueCache<T> { 502 private final String name; 503 private final LoadingCache<String, Optional<T>> cache; 504 505 RefreshableExpiringValueCache(String name, Duration refreshPeriod, 506 ThrowingSupplier<T> supplier) { 507 this.name = name; 508 this.cache = 509 CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) 510 .build(new CacheLoader<>() { 511 @Override 512 public Optional<T> load(String key) { 513 try { 514 return Optional.of(supplier.get()); 515 } catch (Exception e) { 516 LOG.warn("Failed to refresh cache {}", name, e); 517 return Optional.empty(); 518 } 519 } 520 }); 521 } 522 523 Optional<T> get() { 524 return cache.getUnchecked(name); 525 } 526 527 void refresh() { 528 cache.refresh(name); 529 } 530 531 void invalidate() { 532 cache.invalidate(name); 533 } 534 } 535 536 @FunctionalInterface 537 static interface ThrowingSupplier<T> { 538 T get() throws Exception; 539 } 540 541 static interface Fetcher<Key, Value> { 542 Get makeGet(Map.Entry<Key, Value> entry); 543 544 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 545 } 546}