001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
022
023import org.apache.hadoop.hbase.regionserver.HRegionServer;
024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ScheduledChore;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.yetus.audience.InterfaceStability;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.regionserver.RegionServerServices;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.security.UserGroupInformation;
045
046/**
047 * Cache that keeps track of the quota settings for the users and tables that
048 * are interacting with it.
049 *
050 * To avoid blocking the operations if the requested quota is not in cache
051 * an "empty quota" will be returned and the request to fetch the quota information
052 * will be enqueued for the next refresh.
053 *
054 * TODO: At the moment the Cache has a Chore that will be triggered every 5min
055 * or on cache-miss events. Later the Quotas will be pushed using the notification system.
056 */
057@InterfaceAudience.Private
058@InterfaceStability.Evolving
059public class QuotaCache implements Stoppable {
060  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
061
062  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
063  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
064  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
065
066  // for testing purpose only, enforce the cache to be always refreshed
067  static boolean TEST_FORCE_REFRESH = false;
068
069  private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
070  private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
071  private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
072  private final RegionServerServices rsServices;
073
074  private QuotaRefresherChore refreshChore;
075  private boolean stopped = true;
076
077  public QuotaCache(final RegionServerServices rsServices) {
078    this.rsServices = rsServices;
079  }
080
081  public void start() throws IOException {
082    stopped = false;
083
084    // TODO: This will be replaced once we have the notification bus ready.
085    Configuration conf = rsServices.getConfiguration();
086    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
087    refreshChore = new QuotaRefresherChore(period, this);
088    rsServices.getChoreService().scheduleChore(refreshChore);
089  }
090
091  @Override
092  public void stop(final String why) {
093    if (refreshChore != null) {
094      LOG.debug("Stopping QuotaRefresherChore chore.");
095      refreshChore.cancel(true);
096    }
097    stopped = true;
098  }
099
100  @Override
101  public boolean isStopped() {
102    return stopped;
103  }
104
105  /**
106   * Returns the limiter associated to the specified user/table.
107   *
108   * @param ugi the user to limit
109   * @param table the table to limit
110   * @return the limiter associated to the specified user/table
111   */
112  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
113    if (table.isSystemTable()) {
114      return NoopQuotaLimiter.get();
115    }
116    return getUserQuotaState(ugi).getTableLimiter(table);
117  }
118
119  /**
120   * Returns the QuotaState associated to the specified user.
121   * @param ugi the user
122   * @return the quota info associated to specified user
123   */
124  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
125    return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
126      this::triggerCacheRefresh);
127  }
128
129  /**
130   * Returns the limiter associated to the specified table.
131   *
132   * @param table the table to limit
133   * @return the limiter associated to the specified table
134   */
135  public QuotaLimiter getTableLimiter(final TableName table) {
136    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
137  }
138
139  /**
140   * Returns the limiter associated to the specified namespace.
141   *
142   * @param namespace the namespace to limit
143   * @return the limiter associated to the specified namespace
144   */
145  public QuotaLimiter getNamespaceLimiter(final String namespace) {
146    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
147  }
148
149  /**
150   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
151   * returned and the quota request will be enqueued for the next cache refresh.
152   */
153  private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
154      final K key) {
155    return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
156  }
157
158  @VisibleForTesting
159  void triggerCacheRefresh() {
160    refreshChore.triggerNow();
161  }
162
163  @VisibleForTesting
164  long getLastUpdate() {
165    return refreshChore.lastUpdate;
166  }
167
168  @VisibleForTesting
169  Map<String, QuotaState> getNamespaceQuotaCache() {
170    return namespaceQuotaCache;
171  }
172
173  @VisibleForTesting
174  Map<TableName, QuotaState> getTableQuotaCache() {
175    return tableQuotaCache;
176  }
177
178  @VisibleForTesting
179  Map<String, UserQuotaState> getUserQuotaCache() {
180    return userQuotaCache;
181  }
182
183  // TODO: Remove this once we have the notification bus
184  private class QuotaRefresherChore extends ScheduledChore {
185    private long lastUpdate = 0;
186
187    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
188      super("QuotaRefresherChore", stoppable, period);
189    }
190
191    @Override
192    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES",
193      justification="I do not understand why the complaints, it looks good to me -- FIX")
194    protected void chore() {
195      // Prefetch online tables/namespaces
196      for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
197        if (table.isSystemTable()) continue;
198        if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
199          QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
200        }
201        String ns = table.getNamespaceAsString();
202        if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
203          QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
204        }
205      }
206
207      fetchNamespaceQuotaState();
208      fetchTableQuotaState();
209      fetchUserQuotaState();
210      lastUpdate = EnvironmentEdgeManager.currentTime();
211    }
212
213    private void fetchNamespaceQuotaState() {
214      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
215        @Override
216        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
217          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
218        }
219
220        @Override
221        public Map<String, QuotaState> fetchEntries(final List<Get> gets)
222            throws IOException {
223          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets);
224        }
225      });
226    }
227
228    private void fetchTableQuotaState() {
229      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
230        @Override
231        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
232          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
233        }
234
235        @Override
236        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets)
237            throws IOException {
238          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets);
239        }
240      });
241    }
242
243    private void fetchUserQuotaState() {
244      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
245      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
246      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
247        @Override
248        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
249          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
250        }
251
252        @Override
253        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets)
254            throws IOException {
255          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets);
256        }
257      });
258    }
259
260    private <K, V extends QuotaState> void fetch(final String type,
261        final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
262      long now = EnvironmentEdgeManager.currentTime();
263      long refreshPeriod = getPeriod();
264      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
265
266      // Find the quota entries to update
267      List<Get> gets = new ArrayList<>();
268      List<K> toRemove = new ArrayList<>();
269      for (Map.Entry<K, V> entry: quotasMap.entrySet()) {
270        long lastUpdate = entry.getValue().getLastUpdate();
271        long lastQuery = entry.getValue().getLastQuery();
272        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
273          toRemove.add(entry.getKey());
274        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
275          gets.add(fetcher.makeGet(entry));
276        }
277      }
278
279      for (final K key: toRemove) {
280        if (LOG.isTraceEnabled()) {
281          LOG.trace("evict " + type + " key=" + key);
282        }
283        quotasMap.remove(key);
284      }
285
286      // fetch and update the quota entries
287      if (!gets.isEmpty()) {
288        try {
289          for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) {
290            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
291            if (quotaInfo != null) {
292              quotaInfo.update(entry.getValue());
293            }
294
295            if (LOG.isTraceEnabled()) {
296              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
297            }
298          }
299        } catch (IOException e) {
300          LOG.warn("Unable to read " + type + " from quota table", e);
301        }
302      }
303    }
304  }
305
306  static interface Fetcher<Key, Value> {
307    Get makeGet(Map.Entry<Key, Value> entry);
308    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
309  }
310}