001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import org.apache.hadoop.hbase.ClusterMetrics.Option; 024import org.apache.hadoop.hbase.MetaTableAccessor; 025import org.apache.hadoop.hbase.regionserver.HRegionServer; 026import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.EnumSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.ConcurrentHashMap; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ScheduledChore; 038import org.apache.hadoop.hbase.Stoppable; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.regionserver.RegionServerServices; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.security.UserGroupInformation; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.yetus.audience.InterfaceStability; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Cache that keeps track of the quota settings for the users and tables that 051 * are interacting with it. 052 * 053 * To avoid blocking the operations if the requested quota is not in cache 054 * an "empty quota" will be returned and the request to fetch the quota information 055 * will be enqueued for the next refresh. 056 * 057 * TODO: At the moment the Cache has a Chore that will be triggered every 5min 058 * or on cache-miss events. Later the Quotas will be pushed using the notification system. 059 */ 060@InterfaceAudience.Private 061@InterfaceStability.Evolving 062public class QuotaCache implements Stoppable { 063 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 064 065 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 066 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min 067 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD 068 069 // for testing purpose only, enforce the cache to be always refreshed 070 static boolean TEST_FORCE_REFRESH = false; 071 072 private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 073 private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 074 private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 075 private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache = 076 new ConcurrentHashMap<>(); 077 private volatile boolean exceedThrottleQuotaEnabled = false; 078 // factors used to divide cluster scope quota into machine scope quota 079 private volatile double machineQuotaFactor = 1; 080 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 081 new ConcurrentHashMap<>(); 082 private final RegionServerServices rsServices; 083 084 private QuotaRefresherChore refreshChore; 085 private boolean stopped = true; 086 087 public QuotaCache(final RegionServerServices rsServices) { 088 this.rsServices = rsServices; 089 } 090 091 public void start() throws IOException { 092 stopped = false; 093 094 // TODO: This will be replaced once we have the notification bus ready. 095 Configuration conf = rsServices.getConfiguration(); 096 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 097 refreshChore = new QuotaRefresherChore(period, this); 098 rsServices.getChoreService().scheduleChore(refreshChore); 099 } 100 101 @Override 102 public void stop(final String why) { 103 if (refreshChore != null) { 104 LOG.debug("Stopping QuotaRefresherChore chore."); 105 refreshChore.cancel(true); 106 } 107 stopped = true; 108 } 109 110 @Override 111 public boolean isStopped() { 112 return stopped; 113 } 114 115 /** 116 * Returns the limiter associated to the specified user/table. 117 * 118 * @param ugi the user to limit 119 * @param table the table to limit 120 * @return the limiter associated to the specified user/table 121 */ 122 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 123 if (table.isSystemTable()) { 124 return NoopQuotaLimiter.get(); 125 } 126 return getUserQuotaState(ugi).getTableLimiter(table); 127 } 128 129 /** 130 * Returns the QuotaState associated to the specified user. 131 * @param ugi the user 132 * @return the quota info associated to specified user 133 */ 134 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 135 return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, 136 this::triggerCacheRefresh); 137 } 138 139 /** 140 * Returns the limiter associated to the specified table. 141 * 142 * @param table the table to limit 143 * @return the limiter associated to the specified table 144 */ 145 public QuotaLimiter getTableLimiter(final TableName table) { 146 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 147 } 148 149 /** 150 * Returns the limiter associated to the specified namespace. 151 * 152 * @param namespace the namespace to limit 153 * @return the limiter associated to the specified namespace 154 */ 155 public QuotaLimiter getNamespaceLimiter(final String namespace) { 156 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 157 } 158 159 /** 160 * Returns the limiter associated to the specified region server. 161 * 162 * @param regionServer the region server to limit 163 * @return the limiter associated to the specified region server 164 */ 165 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 166 return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter(); 167 } 168 169 protected boolean isExceedThrottleQuotaEnabled() { 170 return exceedThrottleQuotaEnabled; 171 } 172 173 /** 174 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 175 * returned and the quota request will be enqueued for the next cache refresh. 176 */ 177 private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, 178 final K key) { 179 return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); 180 } 181 182 @VisibleForTesting 183 void triggerCacheRefresh() { 184 refreshChore.triggerNow(); 185 } 186 187 @VisibleForTesting 188 long getLastUpdate() { 189 return refreshChore.lastUpdate; 190 } 191 192 @VisibleForTesting 193 Map<String, QuotaState> getNamespaceQuotaCache() { 194 return namespaceQuotaCache; 195 } 196 197 @VisibleForTesting 198 Map<String, QuotaState> getRegionServerQuotaCache() { 199 return regionServerQuotaCache; 200 } 201 202 @VisibleForTesting 203 Map<TableName, QuotaState> getTableQuotaCache() { 204 return tableQuotaCache; 205 } 206 207 @VisibleForTesting 208 Map<String, UserQuotaState> getUserQuotaCache() { 209 return userQuotaCache; 210 } 211 212 // TODO: Remove this once we have the notification bus 213 private class QuotaRefresherChore extends ScheduledChore { 214 private long lastUpdate = 0; 215 216 public QuotaRefresherChore(final int period, final Stoppable stoppable) { 217 super("QuotaRefresherChore", stoppable, period); 218 } 219 220 @Override 221 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES", 222 justification="I do not understand why the complaints, it looks good to me -- FIX") 223 protected void chore() { 224 // Prefetch online tables/namespaces 225 for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) { 226 if (table.isSystemTable()) continue; 227 if (!QuotaCache.this.tableQuotaCache.containsKey(table)) { 228 QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); 229 } 230 String ns = table.getNamespaceAsString(); 231 if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) { 232 QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); 233 } 234 } 235 QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, 236 new QuotaState()); 237 238 updateQuotaFactors(); 239 fetchNamespaceQuotaState(); 240 fetchTableQuotaState(); 241 fetchUserQuotaState(); 242 fetchRegionServerQuotaState(); 243 fetchExceedThrottleQuota(); 244 lastUpdate = EnvironmentEdgeManager.currentTime(); 245 } 246 247 private void fetchNamespaceQuotaState() { 248 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 249 @Override 250 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 251 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 252 } 253 254 @Override 255 public Map<String, QuotaState> fetchEntries(final List<Get> gets) 256 throws IOException { 257 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, 258 machineQuotaFactor); 259 } 260 }); 261 } 262 263 private void fetchTableQuotaState() { 264 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 265 @Override 266 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 267 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 268 } 269 270 @Override 271 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) 272 throws IOException { 273 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, 274 tableMachineQuotaFactors); 275 } 276 }); 277 } 278 279 private void fetchUserQuotaState() { 280 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 281 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 282 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 283 @Override 284 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 285 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 286 } 287 288 @Override 289 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) 290 throws IOException { 291 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, 292 tableMachineQuotaFactors, machineQuotaFactor); 293 } 294 }); 295 } 296 297 private void fetchRegionServerQuotaState() { 298 fetch("regionServer", QuotaCache.this.regionServerQuotaCache, 299 new Fetcher<String, QuotaState>() { 300 @Override 301 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 302 return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey()); 303 } 304 305 @Override 306 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 307 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 308 } 309 }); 310 } 311 312 private void fetchExceedThrottleQuota() { 313 try { 314 QuotaCache.this.exceedThrottleQuotaEnabled = 315 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 316 } catch (IOException e) { 317 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 318 } 319 } 320 321 private <K, V extends QuotaState> void fetch(final String type, 322 final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 323 long now = EnvironmentEdgeManager.currentTime(); 324 long refreshPeriod = getPeriod(); 325 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; 326 327 // Find the quota entries to update 328 List<Get> gets = new ArrayList<>(); 329 List<K> toRemove = new ArrayList<>(); 330 for (Map.Entry<K, V> entry: quotasMap.entrySet()) { 331 long lastUpdate = entry.getValue().getLastUpdate(); 332 long lastQuery = entry.getValue().getLastQuery(); 333 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 334 toRemove.add(entry.getKey()); 335 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) { 336 gets.add(fetcher.makeGet(entry)); 337 } 338 } 339 340 for (final K key: toRemove) { 341 if (LOG.isTraceEnabled()) { 342 LOG.trace("evict " + type + " key=" + key); 343 } 344 quotasMap.remove(key); 345 } 346 347 // fetch and update the quota entries 348 if (!gets.isEmpty()) { 349 try { 350 for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) { 351 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 352 if (quotaInfo != null) { 353 quotaInfo.update(entry.getValue()); 354 } 355 356 if (LOG.isTraceEnabled()) { 357 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 358 } 359 } 360 } catch (IOException e) { 361 LOG.warn("Unable to read " + type + " from quota table", e); 362 } 363 } 364 } 365 366 /** 367 * Update quota factors which is used to divide cluster scope quota into machine scope quota 368 * 369 * For user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. 370 * For table/user over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] 371 * as machine factor. 372 */ 373 private void updateQuotaFactors() { 374 // Update machine quota factor 375 try { 376 int rsSize = rsServices.getConnection().getAdmin() 377 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)).getServersName().size(); 378 if (rsSize != 0) { 379 // TODO if use rs group, the cluster limit should be shared by the rs group 380 machineQuotaFactor = 1.0 / rsSize; 381 } 382 } catch (IOException e) { 383 LOG.warn("Get live region servers failed", e); 384 } 385 386 // Update table machine quota factors 387 for (TableName tableName : tableQuotaCache.keySet()) { 388 double factor = 1; 389 try { 390 long regionSize = 391 MetaTableAccessor.getTableRegions(rsServices.getConnection(), tableName, true) 392 .stream().filter(regionInfo -> !regionInfo.isOffline()).count(); 393 if (regionSize == 0) { 394 factor = 0; 395 } else { 396 int localRegionSize = rsServices.getRegions(tableName).size(); 397 factor = 1.0 * localRegionSize / regionSize; 398 } 399 } catch (IOException e) { 400 LOG.warn("Get table regions failed: {}", tableName, e); 401 } 402 tableMachineQuotaFactors.put(tableName, factor); 403 } 404 } 405 } 406 407 static interface Fetcher<Key, Value> { 408 Get makeGet(Map.Entry<Key, Value> entry); 409 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 410 } 411}