001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterMetrics;
031import org.apache.hadoop.hbase.ClusterMetrics.Option;
032import org.apache.hadoop.hbase.ScheduledChore;
033import org.apache.hadoop.hbase.Stoppable;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.RegionStatesCount;
037import org.apache.hadoop.hbase.regionserver.HRegionServer;
038import org.apache.hadoop.hbase.regionserver.RegionServerServices;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Cache that keeps track of the quota settings for the users and tables that are interacting with
048 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
049 * be returned and the request to fetch the quota information will be enqueued for the next refresh.
050 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
051 * events. Later the Quotas will be pushed using the notification system.
052 */
053@InterfaceAudience.Private
054@InterfaceStability.Evolving
055public class QuotaCache implements Stoppable {
056  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
057
058  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
059  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
060  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
061
062  // for testing purpose only, enforce the cache to be always refreshed
063  static boolean TEST_FORCE_REFRESH = false;
064
065  private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
066    new ConcurrentHashMap<>();
067  private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache =
068    new ConcurrentHashMap<>();
069  private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache =
070    new ConcurrentHashMap<>();
071  private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
072    new ConcurrentHashMap<>();
073  private volatile boolean exceedThrottleQuotaEnabled = false;
074  // factors used to divide cluster scope quota into machine scope quota
075  private volatile double machineQuotaFactor = 1;
076  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
077    new ConcurrentHashMap<>();
078  private final RegionServerServices rsServices;
079
080  private QuotaRefresherChore refreshChore;
081  private boolean stopped = true;
082
083  public QuotaCache(final RegionServerServices rsServices) {
084    this.rsServices = rsServices;
085  }
086
087  public void start() throws IOException {
088    stopped = false;
089
090    // TODO: This will be replaced once we have the notification bus ready.
091    Configuration conf = rsServices.getConfiguration();
092    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
093    refreshChore = new QuotaRefresherChore(period, this);
094    rsServices.getChoreService().scheduleChore(refreshChore);
095  }
096
097  @Override
098  public void stop(final String why) {
099    if (refreshChore != null) {
100      LOG.debug("Stopping QuotaRefresherChore chore.");
101      refreshChore.shutdown(true);
102    }
103    stopped = true;
104  }
105
106  @Override
107  public boolean isStopped() {
108    return stopped;
109  }
110
111  /**
112   * Returns the limiter associated to the specified user/table.
113   * @param ugi   the user to limit
114   * @param table the table to limit
115   * @return the limiter associated to the specified user/table
116   */
117  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
118    if (table.isSystemTable()) {
119      return NoopQuotaLimiter.get();
120    }
121    return getUserQuotaState(ugi).getTableLimiter(table);
122  }
123
124  /**
125   * Returns the QuotaState associated to the specified user.
126   * @param ugi the user
127   * @return the quota info associated to specified user
128   */
129  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
130    return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
131      this::triggerCacheRefresh);
132  }
133
134  /**
135   * Returns the limiter associated to the specified table.
136   * @param table the table to limit
137   * @return the limiter associated to the specified table
138   */
139  public QuotaLimiter getTableLimiter(final TableName table) {
140    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
141  }
142
143  /**
144   * Returns the limiter associated to the specified namespace.
145   * @param namespace the namespace to limit
146   * @return the limiter associated to the specified namespace
147   */
148  public QuotaLimiter getNamespaceLimiter(final String namespace) {
149    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
150  }
151
152  /**
153   * Returns the limiter associated to the specified region server.
154   * @param regionServer the region server to limit
155   * @return the limiter associated to the specified region server
156   */
157  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
158    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
159  }
160
161  protected boolean isExceedThrottleQuotaEnabled() {
162    return exceedThrottleQuotaEnabled;
163  }
164
165  /**
166   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
167   * returned and the quota request will be enqueued for the next cache refresh.
168   */
169  private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
170    final K key) {
171    return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
172  }
173
174  void triggerCacheRefresh() {
175    refreshChore.triggerNow();
176  }
177
178  long getLastUpdate() {
179    return refreshChore.lastUpdate;
180  }
181
182  Map<String, QuotaState> getNamespaceQuotaCache() {
183    return namespaceQuotaCache;
184  }
185
186  Map<String, QuotaState> getRegionServerQuotaCache() {
187    return regionServerQuotaCache;
188  }
189
190  Map<TableName, QuotaState> getTableQuotaCache() {
191    return tableQuotaCache;
192  }
193
194  Map<String, UserQuotaState> getUserQuotaCache() {
195    return userQuotaCache;
196  }
197
198  // TODO: Remove this once we have the notification bus
199  private class QuotaRefresherChore extends ScheduledChore {
200    private long lastUpdate = 0;
201
202    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
203      super("QuotaRefresherChore", stoppable, period);
204    }
205
206    @Override
207    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
208        justification = "I do not understand why the complaints, it looks good to me -- FIX")
209    protected void chore() {
210      // Prefetch online tables/namespaces
211      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
212        if (table.isSystemTable()) continue;
213        if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
214          QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
215        }
216        String ns = table.getNamespaceAsString();
217        if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
218          QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
219        }
220      }
221      QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
222        new QuotaState());
223
224      updateQuotaFactors();
225      fetchNamespaceQuotaState();
226      fetchTableQuotaState();
227      fetchUserQuotaState();
228      fetchRegionServerQuotaState();
229      fetchExceedThrottleQuota();
230      lastUpdate = EnvironmentEdgeManager.currentTime();
231    }
232
233    private void fetchNamespaceQuotaState() {
234      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
235        @Override
236        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
237          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
238        }
239
240        @Override
241        public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
242          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
243            machineQuotaFactor);
244        }
245      });
246    }
247
248    private void fetchTableQuotaState() {
249      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
250        @Override
251        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
252          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
253        }
254
255        @Override
256        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
257          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
258            tableMachineQuotaFactors);
259        }
260      });
261    }
262
263    private void fetchUserQuotaState() {
264      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
265      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
266      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
267        @Override
268        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
269          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
270        }
271
272        @Override
273        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
274          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
275            tableMachineQuotaFactors, machineQuotaFactor);
276        }
277      });
278    }
279
280    private void fetchRegionServerQuotaState() {
281      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
282        new Fetcher<String, QuotaState>() {
283          @Override
284          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
285            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
286          }
287
288          @Override
289          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
290            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
291          }
292        });
293    }
294
295    private void fetchExceedThrottleQuota() {
296      try {
297        QuotaCache.this.exceedThrottleQuotaEnabled =
298          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
299      } catch (IOException e) {
300        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
301      }
302    }
303
304    private <K, V extends QuotaState> void fetch(final String type,
305      final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
306      long now = EnvironmentEdgeManager.currentTime();
307      long refreshPeriod = getPeriod();
308      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
309
310      // Find the quota entries to update
311      List<Get> gets = new ArrayList<>();
312      List<K> toRemove = new ArrayList<>();
313      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
314        long lastUpdate = entry.getValue().getLastUpdate();
315        long lastQuery = entry.getValue().getLastQuery();
316        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
317          toRemove.add(entry.getKey());
318        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
319          gets.add(fetcher.makeGet(entry));
320        }
321      }
322
323      for (final K key : toRemove) {
324        if (LOG.isTraceEnabled()) {
325          LOG.trace("evict " + type + " key=" + key);
326        }
327        quotasMap.remove(key);
328      }
329
330      // fetch and update the quota entries
331      if (!gets.isEmpty()) {
332        try {
333          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
334            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
335            if (quotaInfo != null) {
336              quotaInfo.update(entry.getValue());
337            }
338
339            if (LOG.isTraceEnabled()) {
340              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
341            }
342          }
343        } catch (IOException e) {
344          LOG.warn("Unable to read " + type + " from quota table", e);
345        }
346      }
347    }
348
349    /**
350     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
351     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
352     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
353     */
354    private void updateQuotaFactors() {
355      // Update machine quota factor
356      ClusterMetrics clusterMetrics;
357      try {
358        clusterMetrics = rsServices.getConnection().getAdmin()
359          .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT));
360      } catch (IOException e) {
361        LOG.warn("Failed to get cluster metrics needed for updating quotas", e);
362        return;
363      }
364
365      int rsSize = clusterMetrics.getServersName().size();
366      if (rsSize != 0) {
367        // TODO if use rs group, the cluster limit should be shared by the rs group
368        machineQuotaFactor = 1.0 / rsSize;
369      }
370
371      Map<TableName, RegionStatesCount> tableRegionStatesCount =
372        clusterMetrics.getTableRegionStatesCount();
373
374      // Update table machine quota factors
375      for (TableName tableName : tableQuotaCache.keySet()) {
376        double factor = 1;
377        try {
378          long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
379          if (regionSize == 0) {
380            factor = 0;
381          } else {
382            int localRegionSize = rsServices.getRegions(tableName).size();
383            factor = 1.0 * localRegionSize / regionSize;
384          }
385        } catch (IOException e) {
386          LOG.warn("Get table regions failed: {}", tableName, e);
387        }
388        tableMachineQuotaFactors.put(tableName, factor);
389      }
390    }
391  }
392
393  static interface Fetcher<Key, Value> {
394    Get makeGet(Map.Entry<Key, Value> entry);
395
396    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
397  }
398}