001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 022 023import org.apache.hadoop.hbase.regionserver.HRegionServer; 024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ScheduledChore; 035import org.apache.hadoop.hbase.Stoppable; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.yetus.audience.InterfaceStability; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.regionserver.RegionServerServices; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.security.UserGroupInformation; 045 046/** 047 * Cache that keeps track of the quota settings for the users and tables that 048 * are interacting with it. 049 * 050 * To avoid blocking the operations if the requested quota is not in cache 051 * an "empty quota" will be returned and the request to fetch the quota information 052 * will be enqueued for the next refresh. 053 * 054 * TODO: At the moment the Cache has a Chore that will be triggered every 5min 055 * or on cache-miss events. Later the Quotas will be pushed using the notification system. 056 */ 057@InterfaceAudience.Private 058@InterfaceStability.Evolving 059public class QuotaCache implements Stoppable { 060 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 061 062 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 063 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min 064 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD 065 066 // for testing purpose only, enforce the cache to be always refreshed 067 static boolean TEST_FORCE_REFRESH = false; 068 069 private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 070 private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 071 private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 072 private final RegionServerServices rsServices; 073 074 private QuotaRefresherChore refreshChore; 075 private boolean stopped = true; 076 077 public QuotaCache(final RegionServerServices rsServices) { 078 this.rsServices = rsServices; 079 } 080 081 public void start() throws IOException { 082 stopped = false; 083 084 // TODO: This will be replaced once we have the notification bus ready. 085 Configuration conf = rsServices.getConfiguration(); 086 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 087 refreshChore = new QuotaRefresherChore(period, this); 088 rsServices.getChoreService().scheduleChore(refreshChore); 089 } 090 091 @Override 092 public void stop(final String why) { 093 if (refreshChore != null) { 094 LOG.debug("Stopping QuotaRefresherChore chore."); 095 refreshChore.cancel(true); 096 } 097 stopped = true; 098 } 099 100 @Override 101 public boolean isStopped() { 102 return stopped; 103 } 104 105 /** 106 * Returns the limiter associated to the specified user/table. 107 * 108 * @param ugi the user to limit 109 * @param table the table to limit 110 * @return the limiter associated to the specified user/table 111 */ 112 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 113 if (table.isSystemTable()) { 114 return NoopQuotaLimiter.get(); 115 } 116 return getUserQuotaState(ugi).getTableLimiter(table); 117 } 118 119 /** 120 * Returns the QuotaState associated to the specified user. 121 * @param ugi the user 122 * @return the quota info associated to specified user 123 */ 124 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 125 return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, 126 this::triggerCacheRefresh); 127 } 128 129 /** 130 * Returns the limiter associated to the specified table. 131 * 132 * @param table the table to limit 133 * @return the limiter associated to the specified table 134 */ 135 public QuotaLimiter getTableLimiter(final TableName table) { 136 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 137 } 138 139 /** 140 * Returns the limiter associated to the specified namespace. 141 * 142 * @param namespace the namespace to limit 143 * @return the limiter associated to the specified namespace 144 */ 145 public QuotaLimiter getNamespaceLimiter(final String namespace) { 146 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 147 } 148 149 /** 150 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 151 * returned and the quota request will be enqueued for the next cache refresh. 152 */ 153 private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, 154 final K key) { 155 return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); 156 } 157 158 @VisibleForTesting 159 void triggerCacheRefresh() { 160 refreshChore.triggerNow(); 161 } 162 163 @VisibleForTesting 164 long getLastUpdate() { 165 return refreshChore.lastUpdate; 166 } 167 168 @VisibleForTesting 169 Map<String, QuotaState> getNamespaceQuotaCache() { 170 return namespaceQuotaCache; 171 } 172 173 @VisibleForTesting 174 Map<TableName, QuotaState> getTableQuotaCache() { 175 return tableQuotaCache; 176 } 177 178 @VisibleForTesting 179 Map<String, UserQuotaState> getUserQuotaCache() { 180 return userQuotaCache; 181 } 182 183 // TODO: Remove this once we have the notification bus 184 private class QuotaRefresherChore extends ScheduledChore { 185 private long lastUpdate = 0; 186 187 public QuotaRefresherChore(final int period, final Stoppable stoppable) { 188 super("QuotaRefresherChore", stoppable, period); 189 } 190 191 @Override 192 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES", 193 justification="I do not understand why the complaints, it looks good to me -- FIX") 194 protected void chore() { 195 // Prefetch online tables/namespaces 196 for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) { 197 if (table.isSystemTable()) continue; 198 if (!QuotaCache.this.tableQuotaCache.containsKey(table)) { 199 QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); 200 } 201 String ns = table.getNamespaceAsString(); 202 if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) { 203 QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); 204 } 205 } 206 207 fetchNamespaceQuotaState(); 208 fetchTableQuotaState(); 209 fetchUserQuotaState(); 210 lastUpdate = EnvironmentEdgeManager.currentTime(); 211 } 212 213 private void fetchNamespaceQuotaState() { 214 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 215 @Override 216 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 217 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 218 } 219 220 @Override 221 public Map<String, QuotaState> fetchEntries(final List<Get> gets) 222 throws IOException { 223 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets); 224 } 225 }); 226 } 227 228 private void fetchTableQuotaState() { 229 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 230 @Override 231 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 232 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 233 } 234 235 @Override 236 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) 237 throws IOException { 238 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets); 239 } 240 }); 241 } 242 243 private void fetchUserQuotaState() { 244 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 245 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 246 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 247 @Override 248 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 249 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 250 } 251 252 @Override 253 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) 254 throws IOException { 255 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets); 256 } 257 }); 258 } 259 260 private <K, V extends QuotaState> void fetch(final String type, 261 final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 262 long now = EnvironmentEdgeManager.currentTime(); 263 long refreshPeriod = getPeriod(); 264 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; 265 266 // Find the quota entries to update 267 List<Get> gets = new ArrayList<>(); 268 List<K> toRemove = new ArrayList<>(); 269 for (Map.Entry<K, V> entry: quotasMap.entrySet()) { 270 long lastUpdate = entry.getValue().getLastUpdate(); 271 long lastQuery = entry.getValue().getLastQuery(); 272 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 273 toRemove.add(entry.getKey()); 274 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) { 275 gets.add(fetcher.makeGet(entry)); 276 } 277 } 278 279 for (final K key: toRemove) { 280 if (LOG.isTraceEnabled()) { 281 LOG.trace("evict " + type + " key=" + key); 282 } 283 quotasMap.remove(key); 284 } 285 286 // fetch and update the quota entries 287 if (!gets.isEmpty()) { 288 try { 289 for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) { 290 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 291 if (quotaInfo != null) { 292 quotaInfo.update(entry.getValue()); 293 } 294 295 if (LOG.isTraceEnabled()) { 296 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 297 } 298 } 299 } catch (IOException e) { 300 LOG.warn("Unable to read " + type + " from quota table", e); 301 } 302 } 303 } 304 } 305 306 static interface Fetcher<Key, Value> { 307 Get makeGet(Map.Entry<Key, Value> entry); 308 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 309 } 310}