View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  
12  package org.apache.hadoop.hbase.quotas;
13  
14  import java.io.IOException;
15  import java.util.ArrayList;
16  import java.util.List;
17  import java.util.Map;
18  import java.util.Set;
19  import java.util.concurrent.ConcurrentHashMap;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.ScheduledChore;
25  import org.apache.hadoop.hbase.Stoppable;
26  import org.apache.hadoop.hbase.TableName;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.client.Get;
30  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
31  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
32  import org.apache.hadoop.security.UserGroupInformation;
33  
34  import com.google.common.annotations.VisibleForTesting;
35  
36  /**
37   * Cache that keeps track of the quota settings for the users and tables that are interacting with
38   * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
39   * be returned and the request to fetch the quota information will be enqueued for the next refresh.
40   * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
41   * events. Later the Quotas will be pushed using the notification system.
42   */
43  @InterfaceAudience.Private
44  @InterfaceStability.Evolving
45  public class QuotaCache implements Stoppable {
46    private static final Log LOG = LogFactory.getLog(QuotaCache.class);
47  
48    public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
49    private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
50    private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
51  
52    // for testing purpose only, enforce the cache to be always refreshed
53    private static boolean TEST_FORCE_REFRESH = false;
54  
55    private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
56        new ConcurrentHashMap<String, QuotaState>();
57    private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache =
58        new ConcurrentHashMap<TableName, QuotaState>();
59    private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache =
60        new ConcurrentHashMap<String, UserQuotaState>();
61    private final RegionServerServices rsServices;
62  
63    private QuotaRefresherChore refreshChore;
64    private boolean stopped = true;
65  
66    public QuotaCache(final RegionServerServices rsServices) {
67      this.rsServices = rsServices;
68    }
69  
70    public void start() throws IOException {
71      stopped = false;
72  
73      // TODO: This will be replaced once we have the notification bus ready.
74      Configuration conf = rsServices.getConfiguration();
75      int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
76      refreshChore = new QuotaRefresherChore(period, this);
77      rsServices.getChoreService().scheduleChore(refreshChore);
78    }
79  
80    @Override
81    public void stop(final String why) {
82      stopped = true;
83    }
84  
85    @Override
86    public boolean isStopped() {
87      return stopped;
88    }
89  
90    /**
91     * Returns the limiter associated to the specified user/table.
92     * @param ugi the user to limit
93     * @param table the table to limit
94     * @return the limiter associated to the specified user/table
95     */
96    public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
97      if (table.isSystemTable()) {
98        return NoopQuotaLimiter.get();
99      }
100     return getUserQuotaState(ugi).getTableLimiter(table);
101   }
102 
103   /**
104    * Returns the QuotaState associated to the specified user.
105    * @param ugi the user
106    * @return the quota info associated to specified user
107    */
108   public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
109     String key = ugi.getShortUserName();
110     UserQuotaState quotaInfo = userQuotaCache.get(key);
111     if (quotaInfo == null) {
112       quotaInfo = new UserQuotaState();
113       if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) {
114         triggerCacheRefresh();
115       }
116     }
117     return quotaInfo;
118   }
119 
120   /**
121    * Returns the limiter associated to the specified table.
122    * @param table the table to limit
123    * @return the limiter associated to the specified table
124    */
125   public QuotaLimiter getTableLimiter(final TableName table) {
126     return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
127   }
128 
129   /**
130    * Returns the limiter associated to the specified namespace.
131    * @param namespace the namespace to limit
132    * @return the limiter associated to the specified namespace
133    */
134   public QuotaLimiter getNamespaceLimiter(final String namespace) {
135     return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
136   }
137 
138   /**
139    * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
140    * returned and the quota request will be enqueued for the next cache refresh.
141    */
142   private <K> QuotaState
143       getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, final K key) {
144     QuotaState quotaInfo = quotasMap.get(key);
145     if (quotaInfo == null) {
146       quotaInfo = new QuotaState();
147       if (quotasMap.putIfAbsent(key, quotaInfo) == null) {
148         triggerCacheRefresh();
149       }
150     }
151     return quotaInfo;
152   }
153 
154   @VisibleForTesting
155   void triggerCacheRefresh() {
156     refreshChore.triggerNow();
157   }
158 
159   @VisibleForTesting
160   long getLastUpdate() {
161     return refreshChore.lastUpdate;
162   }
163 
164   @VisibleForTesting
165   Map<String, QuotaState> getNamespaceQuotaCache() {
166     return namespaceQuotaCache;
167   }
168 
169   @VisibleForTesting
170   Map<TableName, QuotaState> getTableQuotaCache() {
171     return tableQuotaCache;
172   }
173 
174   @VisibleForTesting
175   Map<String, UserQuotaState> getUserQuotaCache() {
176     return userQuotaCache;
177   }
178 
179   public static boolean isTEST_FORCE_REFRESH() {
180     return TEST_FORCE_REFRESH;
181   }
182 
183   public static void setTEST_FORCE_REFRESH(boolean tEST_FORCE_REFRESH) {
184     TEST_FORCE_REFRESH = tEST_FORCE_REFRESH;
185   }
186 
187   // TODO: Remove this once we have the notification bus
188   private class QuotaRefresherChore extends ScheduledChore {
189     private long lastUpdate = 0;
190 
191     public QuotaRefresherChore(final int period, final Stoppable stoppable) {
192       super("QuotaRefresherChore", stoppable, period);
193     }
194 
195     @Override
196     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
197         justification = "I do not understand why the complaints, it looks good to me -- FIX")
198     protected void chore() {
199       // Prefetch online tables/namespaces
200       for (TableName table : QuotaCache.this.rsServices.getOnlineTables()) {
201         if (table.isSystemTable()) continue;
202         if (!QuotaCache.this.tableQuotaCache.contains(table)) {
203           QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
204         }
205         String ns = table.getNamespaceAsString();
206         if (!QuotaCache.this.namespaceQuotaCache.contains(ns)) {
207           QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
208         }
209       }
210 
211       fetchNamespaceQuotaState();
212       fetchTableQuotaState();
213       fetchUserQuotaState();
214       lastUpdate = EnvironmentEdgeManager.currentTime();
215     }
216 
217     private void fetchNamespaceQuotaState() {
218       fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
219         @Override
220         public Get makeGet(final Map.Entry<String, QuotaState> entry) {
221           return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
222         }
223 
224         @Override
225         public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
226           return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets);
227         }
228       });
229     }
230 
231     private void fetchTableQuotaState() {
232       fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
233         @Override
234         public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
235           return QuotaUtil.makeGetForTableQuotas(entry.getKey());
236         }
237 
238         @Override
239         public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
240           return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets);
241         }
242       });
243     }
244 
245     private void fetchUserQuotaState() {
246       final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
247       final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
248       fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
249         @Override
250         public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
251           return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
252         }
253 
254         @Override
255         public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
256           return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets);
257         }
258       });
259     }
260 
261     private <K, V extends QuotaState> void fetch(final String type,
262         final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
263       long now = EnvironmentEdgeManager.currentTime();
264       long refreshPeriod = getPeriod();
265       long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
266 
267       // Find the quota entries to update
268       List<Get> gets = new ArrayList<Get>();
269       List<K> toRemove = new ArrayList<K>();
270       for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
271         long lastUpdate = entry.getValue().getLastUpdate();
272         long lastQuery = entry.getValue().getLastQuery();
273         if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
274           toRemove.add(entry.getKey());
275         } else if (isTEST_FORCE_REFRESH() || (now - lastUpdate) >= refreshPeriod) {
276           gets.add(fetcher.makeGet(entry));
277         }
278       }
279 
280       for (final K key : toRemove) {
281         if (LOG.isTraceEnabled()) {
282           LOG.trace("evict " + type + " key=" + key);
283         }
284         quotasMap.remove(key);
285       }
286 
287       // fetch and update the quota entries
288       if (!gets.isEmpty()) {
289         try {
290           for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
291             V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
292             if (quotaInfo != null) {
293               quotaInfo.update(entry.getValue());
294             }
295 
296             if (LOG.isTraceEnabled()) {
297               LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
298             }
299           }
300         } catch (IOException e) {
301           LOG.warn("Unable to read " + type + " from quota table", e);
302         }
303       }
304     }
305   }
306 
307   static interface Fetcher<Key, Value> {
308     Get makeGet(Map.Entry<Key, Value> entry);
309 
310     Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
311   }
312 }