001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import org.apache.hadoop.hbase.ClusterMetrics.Option;
024import org.apache.hadoop.hbase.MetaTableAccessor;
025import org.apache.hadoop.hbase.regionserver.HRegionServer;
026import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.EnumSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.ConcurrentHashMap;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.regionserver.RegionServerServices;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.security.UserGroupInformation;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.yetus.audience.InterfaceStability;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Cache that keeps track of the quota settings for the users and tables that
051 * are interacting with it.
052 *
053 * To avoid blocking the operations if the requested quota is not in cache
054 * an "empty quota" will be returned and the request to fetch the quota information
055 * will be enqueued for the next refresh.
056 *
057 * TODO: At the moment the Cache has a Chore that will be triggered every 5min
058 * or on cache-miss events. Later the Quotas will be pushed using the notification system.
059 */
060@InterfaceAudience.Private
061@InterfaceStability.Evolving
062public class QuotaCache implements Stoppable {
063  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
064
065  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
066  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
067  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
068
069  // for testing purpose only, enforce the cache to be always refreshed
070  static boolean TEST_FORCE_REFRESH = false;
071
072  private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
073  private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
074  private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
075  private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
076      new ConcurrentHashMap<>();
077  private volatile boolean exceedThrottleQuotaEnabled = false;
078  // factors used to divide cluster scope quota into machine scope quota
079  private volatile double machineQuotaFactor = 1;
080  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
081      new ConcurrentHashMap<>();
082  private final RegionServerServices rsServices;
083
084  private QuotaRefresherChore refreshChore;
085  private boolean stopped = true;
086
087  public QuotaCache(final RegionServerServices rsServices) {
088    this.rsServices = rsServices;
089  }
090
091  public void start() throws IOException {
092    stopped = false;
093
094    // TODO: This will be replaced once we have the notification bus ready.
095    Configuration conf = rsServices.getConfiguration();
096    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
097    refreshChore = new QuotaRefresherChore(period, this);
098    rsServices.getChoreService().scheduleChore(refreshChore);
099  }
100
101  @Override
102  public void stop(final String why) {
103    if (refreshChore != null) {
104      LOG.debug("Stopping QuotaRefresherChore chore.");
105      refreshChore.cancel(true);
106    }
107    stopped = true;
108  }
109
110  @Override
111  public boolean isStopped() {
112    return stopped;
113  }
114
115  /**
116   * Returns the limiter associated to the specified user/table.
117   *
118   * @param ugi the user to limit
119   * @param table the table to limit
120   * @return the limiter associated to the specified user/table
121   */
122  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
123    if (table.isSystemTable()) {
124      return NoopQuotaLimiter.get();
125    }
126    return getUserQuotaState(ugi).getTableLimiter(table);
127  }
128
129  /**
130   * Returns the QuotaState associated to the specified user.
131   * @param ugi the user
132   * @return the quota info associated to specified user
133   */
134  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
135    return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
136      this::triggerCacheRefresh);
137  }
138
139  /**
140   * Returns the limiter associated to the specified table.
141   *
142   * @param table the table to limit
143   * @return the limiter associated to the specified table
144   */
145  public QuotaLimiter getTableLimiter(final TableName table) {
146    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
147  }
148
149  /**
150   * Returns the limiter associated to the specified namespace.
151   *
152   * @param namespace the namespace to limit
153   * @return the limiter associated to the specified namespace
154   */
155  public QuotaLimiter getNamespaceLimiter(final String namespace) {
156    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
157  }
158
159  /**
160   * Returns the limiter associated to the specified region server.
161   *
162   * @param regionServer the region server to limit
163   * @return the limiter associated to the specified region server
164   */
165  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
166    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
167  }
168
169  protected boolean isExceedThrottleQuotaEnabled() {
170    return exceedThrottleQuotaEnabled;
171  }
172
173  /**
174   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
175   * returned and the quota request will be enqueued for the next cache refresh.
176   */
177  private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
178      final K key) {
179    return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
180  }
181
182  @VisibleForTesting
183  void triggerCacheRefresh() {
184    refreshChore.triggerNow();
185  }
186
187  @VisibleForTesting
188  long getLastUpdate() {
189    return refreshChore.lastUpdate;
190  }
191
192  @VisibleForTesting
193  Map<String, QuotaState> getNamespaceQuotaCache() {
194    return namespaceQuotaCache;
195  }
196
197  @VisibleForTesting
198  Map<String, QuotaState> getRegionServerQuotaCache() {
199    return regionServerQuotaCache;
200  }
201
202  @VisibleForTesting
203  Map<TableName, QuotaState> getTableQuotaCache() {
204    return tableQuotaCache;
205  }
206
207  @VisibleForTesting
208  Map<String, UserQuotaState> getUserQuotaCache() {
209    return userQuotaCache;
210  }
211
212  // TODO: Remove this once we have the notification bus
213  private class QuotaRefresherChore extends ScheduledChore {
214    private long lastUpdate = 0;
215
216    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
217      super("QuotaRefresherChore", stoppable, period);
218    }
219
220    @Override
221    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES",
222      justification="I do not understand why the complaints, it looks good to me -- FIX")
223    protected void chore() {
224      // Prefetch online tables/namespaces
225      for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
226        if (table.isSystemTable()) continue;
227        if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
228          QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
229        }
230        String ns = table.getNamespaceAsString();
231        if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
232          QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
233        }
234      }
235      QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
236        new QuotaState());
237
238      updateQuotaFactors();
239      fetchNamespaceQuotaState();
240      fetchTableQuotaState();
241      fetchUserQuotaState();
242      fetchRegionServerQuotaState();
243      fetchExceedThrottleQuota();
244      lastUpdate = EnvironmentEdgeManager.currentTime();
245    }
246
247    private void fetchNamespaceQuotaState() {
248      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
249        @Override
250        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
251          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
252        }
253
254        @Override
255        public Map<String, QuotaState> fetchEntries(final List<Get> gets)
256            throws IOException {
257          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
258            machineQuotaFactor);
259        }
260      });
261    }
262
263    private void fetchTableQuotaState() {
264      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
265        @Override
266        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
267          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
268        }
269
270        @Override
271        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets)
272            throws IOException {
273          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
274            tableMachineQuotaFactors);
275        }
276      });
277    }
278
279    private void fetchUserQuotaState() {
280      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
281      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
282      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
283        @Override
284        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
285          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
286        }
287
288        @Override
289        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets)
290            throws IOException {
291          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
292            tableMachineQuotaFactors, machineQuotaFactor);
293        }
294      });
295    }
296
297    private void fetchRegionServerQuotaState() {
298      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
299        new Fetcher<String, QuotaState>() {
300          @Override
301          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
302            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
303          }
304
305          @Override
306          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
307            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
308          }
309        });
310    }
311
312    private void fetchExceedThrottleQuota() {
313      try {
314        QuotaCache.this.exceedThrottleQuotaEnabled =
315            QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
316      } catch (IOException e) {
317        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
318      }
319    }
320
321    private <K, V extends QuotaState> void fetch(final String type,
322        final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
323      long now = EnvironmentEdgeManager.currentTime();
324      long refreshPeriod = getPeriod();
325      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
326
327      // Find the quota entries to update
328      List<Get> gets = new ArrayList<>();
329      List<K> toRemove = new ArrayList<>();
330      for (Map.Entry<K, V> entry: quotasMap.entrySet()) {
331        long lastUpdate = entry.getValue().getLastUpdate();
332        long lastQuery = entry.getValue().getLastQuery();
333        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
334          toRemove.add(entry.getKey());
335        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
336          gets.add(fetcher.makeGet(entry));
337        }
338      }
339
340      for (final K key: toRemove) {
341        if (LOG.isTraceEnabled()) {
342          LOG.trace("evict " + type + " key=" + key);
343        }
344        quotasMap.remove(key);
345      }
346
347      // fetch and update the quota entries
348      if (!gets.isEmpty()) {
349        try {
350          for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) {
351            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
352            if (quotaInfo != null) {
353              quotaInfo.update(entry.getValue());
354            }
355
356            if (LOG.isTraceEnabled()) {
357              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
358            }
359          }
360        } catch (IOException e) {
361          LOG.warn("Unable to read " + type + " from quota table", e);
362        }
363      }
364    }
365
366    /**
367     * Update quota factors which is used to divide cluster scope quota into machine scope quota
368     *
369     * For user/namespace/user over namespace quota, use [1 / RSNum] as machine factor.
370     * For table/user over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum]
371     * as machine factor.
372     */
373    private void updateQuotaFactors() {
374      // Update machine quota factor
375      try {
376        int rsSize = rsServices.getConnection().getAdmin()
377            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)).getServersName().size();
378        if (rsSize != 0) {
379          // TODO if use rs group, the cluster limit should be shared by the rs group
380          machineQuotaFactor = 1.0 / rsSize;
381        }
382      } catch (IOException e) {
383        LOG.warn("Get live region servers failed", e);
384      }
385
386      // Update table machine quota factors
387      for (TableName tableName : tableQuotaCache.keySet()) {
388        double factor = 1;
389        try {
390          long regionSize =
391              MetaTableAccessor.getTableRegions(rsServices.getConnection(), tableName, true)
392                  .stream().filter(regionInfo -> !regionInfo.isOffline()).count();
393          if (regionSize == 0) {
394            factor = 0;
395          } else {
396            int localRegionSize = rsServices.getRegions(tableName).size();
397            factor = 1.0 * localRegionSize / regionSize;
398          }
399        } catch (IOException e) {
400          LOG.warn("Get table regions failed: {}", tableName, e);
401        }
402        tableMachineQuotaFactors.put(tableName, factor);
403      }
404    }
405  }
406
407  static interface Fetcher<Key, Value> {
408    Get makeGet(Map.Entry<Key, Value> entry);
409    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
410  }
411}