1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.io.IOException;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.ScheduledChore;
25 import org.apache.hadoop.hbase.Stoppable;
26 import org.apache.hadoop.hbase.TableName;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.hbase.client.Get;
30 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
31 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
32 import org.apache.hadoop.security.UserGroupInformation;
33
34 import com.google.common.annotations.VisibleForTesting;
35
36
37
38
39
40
41
42
43 @InterfaceAudience.Private
44 @InterfaceStability.Evolving
45 public class QuotaCache implements Stoppable {
46 private static final Log LOG = LogFactory.getLog(QuotaCache.class);
47
48 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
49 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000;
50 private static final int EVICT_PERIOD_FACTOR = 5;
51
52
53 private static boolean TEST_FORCE_REFRESH = false;
54
55 private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
56 new ConcurrentHashMap<String, QuotaState>();
57 private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache =
58 new ConcurrentHashMap<TableName, QuotaState>();
59 private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache =
60 new ConcurrentHashMap<String, UserQuotaState>();
61 private final RegionServerServices rsServices;
62
63 private QuotaRefresherChore refreshChore;
64 private boolean stopped = true;
65
66 public QuotaCache(final RegionServerServices rsServices) {
67 this.rsServices = rsServices;
68 }
69
70 public void start() throws IOException {
71 stopped = false;
72
73
74 Configuration conf = rsServices.getConfiguration();
75 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
76 refreshChore = new QuotaRefresherChore(period, this);
77 rsServices.getChoreService().scheduleChore(refreshChore);
78 }
79
80 @Override
81 public void stop(final String why) {
82 stopped = true;
83 }
84
85 @Override
86 public boolean isStopped() {
87 return stopped;
88 }
89
90
91
92
93
94
95
96 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
97 if (table.isSystemTable()) {
98 return NoopQuotaLimiter.get();
99 }
100 return getUserQuotaState(ugi).getTableLimiter(table);
101 }
102
103
104
105
106
107
108 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
109 String key = ugi.getShortUserName();
110 UserQuotaState quotaInfo = userQuotaCache.get(key);
111 if (quotaInfo == null) {
112 quotaInfo = new UserQuotaState();
113 if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) {
114 triggerCacheRefresh();
115 }
116 }
117 return quotaInfo;
118 }
119
120
121
122
123
124
125 public QuotaLimiter getTableLimiter(final TableName table) {
126 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
127 }
128
129
130
131
132
133
134 public QuotaLimiter getNamespaceLimiter(final String namespace) {
135 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
136 }
137
138
139
140
141
142 private <K> QuotaState
143 getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, final K key) {
144 QuotaState quotaInfo = quotasMap.get(key);
145 if (quotaInfo == null) {
146 quotaInfo = new QuotaState();
147 if (quotasMap.putIfAbsent(key, quotaInfo) == null) {
148 triggerCacheRefresh();
149 }
150 }
151 return quotaInfo;
152 }
153
154 @VisibleForTesting
155 void triggerCacheRefresh() {
156 refreshChore.triggerNow();
157 }
158
159 @VisibleForTesting
160 long getLastUpdate() {
161 return refreshChore.lastUpdate;
162 }
163
164 @VisibleForTesting
165 Map<String, QuotaState> getNamespaceQuotaCache() {
166 return namespaceQuotaCache;
167 }
168
169 @VisibleForTesting
170 Map<TableName, QuotaState> getTableQuotaCache() {
171 return tableQuotaCache;
172 }
173
174 @VisibleForTesting
175 Map<String, UserQuotaState> getUserQuotaCache() {
176 return userQuotaCache;
177 }
178
179 public static boolean isTEST_FORCE_REFRESH() {
180 return TEST_FORCE_REFRESH;
181 }
182
183 public static void setTEST_FORCE_REFRESH(boolean tEST_FORCE_REFRESH) {
184 TEST_FORCE_REFRESH = tEST_FORCE_REFRESH;
185 }
186
187
188 private class QuotaRefresherChore extends ScheduledChore {
189 private long lastUpdate = 0;
190
191 public QuotaRefresherChore(final int period, final Stoppable stoppable) {
192 super("QuotaRefresherChore", stoppable, period);
193 }
194
195 @Override
196 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
197 justification = "I do not understand why the complaints, it looks good to me -- FIX")
198 protected void chore() {
199
200 for (TableName table : QuotaCache.this.rsServices.getOnlineTables()) {
201 if (table.isSystemTable()) continue;
202 if (!QuotaCache.this.tableQuotaCache.contains(table)) {
203 QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
204 }
205 String ns = table.getNamespaceAsString();
206 if (!QuotaCache.this.namespaceQuotaCache.contains(ns)) {
207 QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
208 }
209 }
210
211 fetchNamespaceQuotaState();
212 fetchTableQuotaState();
213 fetchUserQuotaState();
214 lastUpdate = EnvironmentEdgeManager.currentTime();
215 }
216
217 private void fetchNamespaceQuotaState() {
218 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
219 @Override
220 public Get makeGet(final Map.Entry<String, QuotaState> entry) {
221 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
222 }
223
224 @Override
225 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
226 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets);
227 }
228 });
229 }
230
231 private void fetchTableQuotaState() {
232 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
233 @Override
234 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
235 return QuotaUtil.makeGetForTableQuotas(entry.getKey());
236 }
237
238 @Override
239 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
240 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets);
241 }
242 });
243 }
244
245 private void fetchUserQuotaState() {
246 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
247 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
248 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
249 @Override
250 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
251 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
252 }
253
254 @Override
255 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
256 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets);
257 }
258 });
259 }
260
261 private <K, V extends QuotaState> void fetch(final String type,
262 final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
263 long now = EnvironmentEdgeManager.currentTime();
264 long refreshPeriod = getPeriod();
265 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
266
267
268 List<Get> gets = new ArrayList<Get>();
269 List<K> toRemove = new ArrayList<K>();
270 for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
271 long lastUpdate = entry.getValue().getLastUpdate();
272 long lastQuery = entry.getValue().getLastQuery();
273 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
274 toRemove.add(entry.getKey());
275 } else if (isTEST_FORCE_REFRESH() || (now - lastUpdate) >= refreshPeriod) {
276 gets.add(fetcher.makeGet(entry));
277 }
278 }
279
280 for (final K key : toRemove) {
281 if (LOG.isTraceEnabled()) {
282 LOG.trace("evict " + type + " key=" + key);
283 }
284 quotasMap.remove(key);
285 }
286
287
288 if (!gets.isEmpty()) {
289 try {
290 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
291 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
292 if (quotaInfo != null) {
293 quotaInfo.update(entry.getValue());
294 }
295
296 if (LOG.isTraceEnabled()) {
297 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
298 }
299 }
300 } catch (IOException e) {
301 LOG.warn("Unable to read " + type + " from quota table", e);
302 }
303 }
304 }
305 }
306
307 static interface Fetcher<Key, Value> {
308 Get makeGet(Map.Entry<Key, Value> entry);
309
310 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
311 }
312 }