001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.EnumSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ClusterMetrics; 031import org.apache.hadoop.hbase.ClusterMetrics.Option; 032import org.apache.hadoop.hbase.ScheduledChore; 033import org.apache.hadoop.hbase.Stoppable; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.RegionStatesCount; 037import org.apache.hadoop.hbase.regionserver.HRegionServer; 038import org.apache.hadoop.hbase.regionserver.RegionServerServices; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Cache that keeps track of the quota settings for the users and tables that are interacting with 048 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will 049 * be returned and the request to fetch the quota information will be enqueued for the next refresh. 050 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss 051 * events. Later the Quotas will be pushed using the notification system. 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055public class QuotaCache implements Stoppable { 056 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 057 058 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 059 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min 060 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD 061 062 // for testing purpose only, enforce the cache to be always refreshed 063 static boolean TEST_FORCE_REFRESH = false; 064 065 private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = 066 new ConcurrentHashMap<>(); 067 private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = 068 new ConcurrentHashMap<>(); 069 private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = 070 new ConcurrentHashMap<>(); 071 private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache = 072 new ConcurrentHashMap<>(); 073 private volatile boolean exceedThrottleQuotaEnabled = false; 074 // factors used to divide cluster scope quota into machine scope quota 075 private volatile double machineQuotaFactor = 1; 076 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 077 new ConcurrentHashMap<>(); 078 private final RegionServerServices rsServices; 079 080 private QuotaRefresherChore refreshChore; 081 private boolean stopped = true; 082 083 public QuotaCache(final RegionServerServices rsServices) { 084 this.rsServices = rsServices; 085 } 086 087 public void start() throws IOException { 088 stopped = false; 089 090 // TODO: This will be replaced once we have the notification bus ready. 091 Configuration conf = rsServices.getConfiguration(); 092 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 093 refreshChore = new QuotaRefresherChore(period, this); 094 rsServices.getChoreService().scheduleChore(refreshChore); 095 } 096 097 @Override 098 public void stop(final String why) { 099 if (refreshChore != null) { 100 LOG.debug("Stopping QuotaRefresherChore chore."); 101 refreshChore.shutdown(true); 102 } 103 stopped = true; 104 } 105 106 @Override 107 public boolean isStopped() { 108 return stopped; 109 } 110 111 /** 112 * Returns the limiter associated to the specified user/table. 113 * @param ugi the user to limit 114 * @param table the table to limit 115 * @return the limiter associated to the specified user/table 116 */ 117 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 118 if (table.isSystemTable()) { 119 return NoopQuotaLimiter.get(); 120 } 121 return getUserQuotaState(ugi).getTableLimiter(table); 122 } 123 124 /** 125 * Returns the QuotaState associated to the specified user. 126 * @param ugi the user 127 * @return the quota info associated to specified user 128 */ 129 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 130 return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, 131 this::triggerCacheRefresh); 132 } 133 134 /** 135 * Returns the limiter associated to the specified table. 136 * @param table the table to limit 137 * @return the limiter associated to the specified table 138 */ 139 public QuotaLimiter getTableLimiter(final TableName table) { 140 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 141 } 142 143 /** 144 * Returns the limiter associated to the specified namespace. 145 * @param namespace the namespace to limit 146 * @return the limiter associated to the specified namespace 147 */ 148 public QuotaLimiter getNamespaceLimiter(final String namespace) { 149 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 150 } 151 152 /** 153 * Returns the limiter associated to the specified region server. 154 * @param regionServer the region server to limit 155 * @return the limiter associated to the specified region server 156 */ 157 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 158 return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter(); 159 } 160 161 protected boolean isExceedThrottleQuotaEnabled() { 162 return exceedThrottleQuotaEnabled; 163 } 164 165 /** 166 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 167 * returned and the quota request will be enqueued for the next cache refresh. 168 */ 169 private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, 170 final K key) { 171 return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); 172 } 173 174 void triggerCacheRefresh() { 175 refreshChore.triggerNow(); 176 } 177 178 long getLastUpdate() { 179 return refreshChore.lastUpdate; 180 } 181 182 Map<String, QuotaState> getNamespaceQuotaCache() { 183 return namespaceQuotaCache; 184 } 185 186 Map<String, QuotaState> getRegionServerQuotaCache() { 187 return regionServerQuotaCache; 188 } 189 190 Map<TableName, QuotaState> getTableQuotaCache() { 191 return tableQuotaCache; 192 } 193 194 Map<String, UserQuotaState> getUserQuotaCache() { 195 return userQuotaCache; 196 } 197 198 // TODO: Remove this once we have the notification bus 199 private class QuotaRefresherChore extends ScheduledChore { 200 private long lastUpdate = 0; 201 202 public QuotaRefresherChore(final int period, final Stoppable stoppable) { 203 super("QuotaRefresherChore", stoppable, period); 204 } 205 206 @Override 207 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", 208 justification = "I do not understand why the complaints, it looks good to me -- FIX") 209 protected void chore() { 210 // Prefetch online tables/namespaces 211 for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) { 212 if (table.isSystemTable()) continue; 213 if (!QuotaCache.this.tableQuotaCache.containsKey(table)) { 214 QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); 215 } 216 String ns = table.getNamespaceAsString(); 217 if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) { 218 QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); 219 } 220 } 221 QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, 222 new QuotaState()); 223 224 updateQuotaFactors(); 225 fetchNamespaceQuotaState(); 226 fetchTableQuotaState(); 227 fetchUserQuotaState(); 228 fetchRegionServerQuotaState(); 229 fetchExceedThrottleQuota(); 230 lastUpdate = EnvironmentEdgeManager.currentTime(); 231 } 232 233 private void fetchNamespaceQuotaState() { 234 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 235 @Override 236 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 237 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 238 } 239 240 @Override 241 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 242 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, 243 machineQuotaFactor); 244 } 245 }); 246 } 247 248 private void fetchTableQuotaState() { 249 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 250 @Override 251 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 252 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 253 } 254 255 @Override 256 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 257 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, 258 tableMachineQuotaFactors); 259 } 260 }); 261 } 262 263 private void fetchUserQuotaState() { 264 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 265 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 266 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 267 @Override 268 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 269 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 270 } 271 272 @Override 273 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { 274 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, 275 tableMachineQuotaFactors, machineQuotaFactor); 276 } 277 }); 278 } 279 280 private void fetchRegionServerQuotaState() { 281 fetch("regionServer", QuotaCache.this.regionServerQuotaCache, 282 new Fetcher<String, QuotaState>() { 283 @Override 284 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 285 return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey()); 286 } 287 288 @Override 289 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 290 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 291 } 292 }); 293 } 294 295 private void fetchExceedThrottleQuota() { 296 try { 297 QuotaCache.this.exceedThrottleQuotaEnabled = 298 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 299 } catch (IOException e) { 300 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 301 } 302 } 303 304 private <K, V extends QuotaState> void fetch(final String type, 305 final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 306 long now = EnvironmentEdgeManager.currentTime(); 307 long refreshPeriod = getPeriod(); 308 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; 309 310 // Find the quota entries to update 311 List<Get> gets = new ArrayList<>(); 312 List<K> toRemove = new ArrayList<>(); 313 for (Map.Entry<K, V> entry : quotasMap.entrySet()) { 314 long lastUpdate = entry.getValue().getLastUpdate(); 315 long lastQuery = entry.getValue().getLastQuery(); 316 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 317 toRemove.add(entry.getKey()); 318 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) { 319 gets.add(fetcher.makeGet(entry)); 320 } 321 } 322 323 for (final K key : toRemove) { 324 if (LOG.isTraceEnabled()) { 325 LOG.trace("evict " + type + " key=" + key); 326 } 327 quotasMap.remove(key); 328 } 329 330 // fetch and update the quota entries 331 if (!gets.isEmpty()) { 332 try { 333 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { 334 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 335 if (quotaInfo != null) { 336 quotaInfo.update(entry.getValue()); 337 } 338 339 if (LOG.isTraceEnabled()) { 340 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 341 } 342 } 343 } catch (IOException e) { 344 LOG.warn("Unable to read " + type + " from quota table", e); 345 } 346 } 347 } 348 349 /** 350 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 351 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 352 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 353 */ 354 private void updateQuotaFactors() { 355 // Update machine quota factor 356 ClusterMetrics clusterMetrics; 357 try { 358 clusterMetrics = rsServices.getConnection().getAdmin() 359 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)); 360 } catch (IOException e) { 361 LOG.warn("Failed to get cluster metrics needed for updating quotas", e); 362 return; 363 } 364 365 int rsSize = clusterMetrics.getServersName().size(); 366 if (rsSize != 0) { 367 // TODO if use rs group, the cluster limit should be shared by the rs group 368 machineQuotaFactor = 1.0 / rsSize; 369 } 370 371 Map<TableName, RegionStatesCount> tableRegionStatesCount = 372 clusterMetrics.getTableRegionStatesCount(); 373 374 // Update table machine quota factors 375 for (TableName tableName : tableQuotaCache.keySet()) { 376 double factor = 1; 377 try { 378 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 379 if (regionSize == 0) { 380 factor = 0; 381 } else { 382 int localRegionSize = rsServices.getRegions(tableName).size(); 383 factor = 1.0 * localRegionSize / regionSize; 384 } 385 } catch (IOException e) { 386 LOG.warn("Get table regions failed: {}", tableName, e); 387 } 388 tableMachineQuotaFactors.put(tableName, factor); 389 } 390 } 391 } 392 393 static interface Fetcher<Key, Value> { 394 Get makeGet(Map.Entry<Key, Value> entry); 395 396 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 397 } 398}