001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics.Option; 033import org.apache.hadoop.hbase.MetaTableAccessor; 034import org.apache.hadoop.hbase.ScheduledChore; 035import org.apache.hadoop.hbase.Stoppable; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.regionserver.RegionServerServices; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.security.UserGroupInformation; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.yetus.audience.InterfaceStability; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Cache that keeps track of the quota settings for the users and tables that 049 * are interacting with it. 050 * 051 * To avoid blocking the operations if the requested quota is not in cache 052 * an "empty quota" will be returned and the request to fetch the quota information 053 * will be enqueued for the next refresh. 054 * 055 * TODO: At the moment the Cache has a Chore that will be triggered every 5min 056 * or on cache-miss events. Later the Quotas will be pushed using the notification system. 057 */ 058@InterfaceAudience.Private 059@InterfaceStability.Evolving 060public class QuotaCache implements Stoppable { 061 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 062 063 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 064 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min 065 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD 066 067 // for testing purpose only, enforce the cache to be always refreshed 068 static boolean TEST_FORCE_REFRESH = false; 069 070 private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 071 private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 072 private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 073 private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache = 074 new ConcurrentHashMap<>(); 075 private volatile boolean exceedThrottleQuotaEnabled = false; 076 // factors used to divide cluster scope quota into machine scope quota 077 private volatile double machineQuotaFactor = 1; 078 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 079 new ConcurrentHashMap<>(); 080 private final RegionServerServices rsServices; 081 082 private QuotaRefresherChore refreshChore; 083 private boolean stopped = true; 084 085 public QuotaCache(final RegionServerServices rsServices) { 086 this.rsServices = rsServices; 087 } 088 089 public void start() throws IOException { 090 stopped = false; 091 092 // TODO: This will be replaced once we have the notification bus ready. 093 Configuration conf = rsServices.getConfiguration(); 094 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 095 refreshChore = new QuotaRefresherChore(period, this); 096 rsServices.getChoreService().scheduleChore(refreshChore); 097 } 098 099 @Override 100 public void stop(final String why) { 101 if (refreshChore != null) { 102 LOG.debug("Stopping QuotaRefresherChore chore."); 103 refreshChore.cancel(true); 104 } 105 stopped = true; 106 } 107 108 @Override 109 public boolean isStopped() { 110 return stopped; 111 } 112 113 /** 114 * Returns the limiter associated to the specified user/table. 115 * 116 * @param ugi the user to limit 117 * @param table the table to limit 118 * @return the limiter associated to the specified user/table 119 */ 120 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 121 if (table.isSystemTable()) { 122 return NoopQuotaLimiter.get(); 123 } 124 return getUserQuotaState(ugi).getTableLimiter(table); 125 } 126 127 /** 128 * Returns the QuotaState associated to the specified user. 129 * @param ugi the user 130 * @return the quota info associated to specified user 131 */ 132 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 133 return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, 134 this::triggerCacheRefresh); 135 } 136 137 /** 138 * Returns the limiter associated to the specified table. 139 * 140 * @param table the table to limit 141 * @return the limiter associated to the specified table 142 */ 143 public QuotaLimiter getTableLimiter(final TableName table) { 144 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 145 } 146 147 /** 148 * Returns the limiter associated to the specified namespace. 149 * 150 * @param namespace the namespace to limit 151 * @return the limiter associated to the specified namespace 152 */ 153 public QuotaLimiter getNamespaceLimiter(final String namespace) { 154 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 155 } 156 157 /** 158 * Returns the limiter associated to the specified region server. 159 * 160 * @param regionServer the region server to limit 161 * @return the limiter associated to the specified region server 162 */ 163 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 164 return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter(); 165 } 166 167 protected boolean isExceedThrottleQuotaEnabled() { 168 return exceedThrottleQuotaEnabled; 169 } 170 171 /** 172 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 173 * returned and the quota request will be enqueued for the next cache refresh. 174 */ 175 private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, 176 final K key) { 177 return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); 178 } 179 180 void triggerCacheRefresh() { 181 refreshChore.triggerNow(); 182 } 183 184 long getLastUpdate() { 185 return refreshChore.lastUpdate; 186 } 187 188 Map<String, QuotaState> getNamespaceQuotaCache() { 189 return namespaceQuotaCache; 190 } 191 192 Map<String, QuotaState> getRegionServerQuotaCache() { 193 return regionServerQuotaCache; 194 } 195 196 Map<TableName, QuotaState> getTableQuotaCache() { 197 return tableQuotaCache; 198 } 199 200 Map<String, UserQuotaState> getUserQuotaCache() { 201 return userQuotaCache; 202 } 203 204 // TODO: Remove this once we have the notification bus 205 private class QuotaRefresherChore extends ScheduledChore { 206 private long lastUpdate = 0; 207 208 public QuotaRefresherChore(final int period, final Stoppable stoppable) { 209 super("QuotaRefresherChore", stoppable, period); 210 } 211 212 @Override 213 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES", 214 justification="I do not understand why the complaints, it looks good to me -- FIX") 215 protected void chore() { 216 // Prefetch online tables/namespaces 217 for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) { 218 if (table.isSystemTable()) continue; 219 if (!QuotaCache.this.tableQuotaCache.containsKey(table)) { 220 QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); 221 } 222 String ns = table.getNamespaceAsString(); 223 if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) { 224 QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); 225 } 226 } 227 QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, 228 new QuotaState()); 229 230 updateQuotaFactors(); 231 fetchNamespaceQuotaState(); 232 fetchTableQuotaState(); 233 fetchUserQuotaState(); 234 fetchRegionServerQuotaState(); 235 fetchExceedThrottleQuota(); 236 lastUpdate = EnvironmentEdgeManager.currentTime(); 237 } 238 239 private void fetchNamespaceQuotaState() { 240 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 241 @Override 242 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 243 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 244 } 245 246 @Override 247 public Map<String, QuotaState> fetchEntries(final List<Get> gets) 248 throws IOException { 249 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, 250 machineQuotaFactor); 251 } 252 }); 253 } 254 255 private void fetchTableQuotaState() { 256 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 257 @Override 258 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 259 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 260 } 261 262 @Override 263 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) 264 throws IOException { 265 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, 266 tableMachineQuotaFactors); 267 } 268 }); 269 } 270 271 private void fetchUserQuotaState() { 272 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 273 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 274 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 275 @Override 276 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 277 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 278 } 279 280 @Override 281 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) 282 throws IOException { 283 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, 284 tableMachineQuotaFactors, machineQuotaFactor); 285 } 286 }); 287 } 288 289 private void fetchRegionServerQuotaState() { 290 fetch("regionServer", QuotaCache.this.regionServerQuotaCache, 291 new Fetcher<String, QuotaState>() { 292 @Override 293 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 294 return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey()); 295 } 296 297 @Override 298 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 299 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 300 } 301 }); 302 } 303 304 private void fetchExceedThrottleQuota() { 305 try { 306 QuotaCache.this.exceedThrottleQuotaEnabled = 307 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 308 } catch (IOException e) { 309 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 310 } 311 } 312 313 private <K, V extends QuotaState> void fetch(final String type, 314 final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 315 long now = EnvironmentEdgeManager.currentTime(); 316 long refreshPeriod = getPeriod(); 317 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; 318 319 // Find the quota entries to update 320 List<Get> gets = new ArrayList<>(); 321 List<K> toRemove = new ArrayList<>(); 322 for (Map.Entry<K, V> entry: quotasMap.entrySet()) { 323 long lastUpdate = entry.getValue().getLastUpdate(); 324 long lastQuery = entry.getValue().getLastQuery(); 325 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 326 toRemove.add(entry.getKey()); 327 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) { 328 gets.add(fetcher.makeGet(entry)); 329 } 330 } 331 332 for (final K key: toRemove) { 333 if (LOG.isTraceEnabled()) { 334 LOG.trace("evict " + type + " key=" + key); 335 } 336 quotasMap.remove(key); 337 } 338 339 // fetch and update the quota entries 340 if (!gets.isEmpty()) { 341 try { 342 for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) { 343 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 344 if (quotaInfo != null) { 345 quotaInfo.update(entry.getValue()); 346 } 347 348 if (LOG.isTraceEnabled()) { 349 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 350 } 351 } 352 } catch (IOException e) { 353 LOG.warn("Unable to read " + type + " from quota table", e); 354 } 355 } 356 } 357 358 /** 359 * Update quota factors which is used to divide cluster scope quota into machine scope quota 360 * 361 * For user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. 362 * For table/user over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] 363 * as machine factor. 364 */ 365 private void updateQuotaFactors() { 366 // Update machine quota factor 367 try { 368 int rsSize = rsServices.getConnection().getAdmin() 369 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)).getServersName().size(); 370 if (rsSize != 0) { 371 // TODO if use rs group, the cluster limit should be shared by the rs group 372 machineQuotaFactor = 1.0 / rsSize; 373 } 374 } catch (IOException e) { 375 LOG.warn("Get live region servers failed", e); 376 } 377 378 // Update table machine quota factors 379 for (TableName tableName : tableQuotaCache.keySet()) { 380 double factor = 1; 381 try { 382 long regionSize = 383 MetaTableAccessor.getTableRegions(rsServices.getConnection(), tableName, true) 384 .stream().filter(regionInfo -> !regionInfo.isOffline()).count(); 385 if (regionSize == 0) { 386 factor = 0; 387 } else { 388 int localRegionSize = rsServices.getRegions(tableName).size(); 389 factor = 1.0 * localRegionSize / regionSize; 390 } 391 } catch (IOException e) { 392 LOG.warn("Get table regions failed: {}", tableName, e); 393 } 394 tableMachineQuotaFactors.put(tableName, factor); 395 } 396 } 397 } 398 399 static interface Fetcher<Key, Value> { 400 Get makeGet(Map.Entry<Key, Value> entry); 401 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 402 } 403}