001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.ScheduledChore;
034import org.apache.hadoop.hbase.Stoppable;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.RegionStatesCount;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.regionserver.RegionServerServices;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.security.UserGroupInformation;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Cache that keeps track of the quota settings for the users and tables that are interacting with
049 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
050 * be returned and the request to fetch the quota information will be enqueued for the next refresh.
051 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
052 * events. Later the Quotas will be pushed using the notification system.
053 */
054@InterfaceAudience.Private
055@InterfaceStability.Evolving
056public class QuotaCache implements Stoppable {
057  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
058
059  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
060  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
061  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
062
063  // for testing purpose only, enforce the cache to be always refreshed
064  static boolean TEST_FORCE_REFRESH = false;
065
066  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
067  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
068  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
069  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
070    new ConcurrentHashMap<>();
071  private volatile boolean exceedThrottleQuotaEnabled = false;
072  // factors used to divide cluster scope quota into machine scope quota
073  private volatile double machineQuotaFactor = 1;
074  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
075    new ConcurrentHashMap<>();
076  private final RegionServerServices rsServices;
077
078  private QuotaRefresherChore refreshChore;
079  private boolean stopped = true;
080
081  public QuotaCache(final RegionServerServices rsServices) {
082    this.rsServices = rsServices;
083  }
084
085  public void start() throws IOException {
086    stopped = false;
087
088    // TODO: This will be replaced once we have the notification bus ready.
089    Configuration conf = rsServices.getConfiguration();
090    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
091    refreshChore = new QuotaRefresherChore(period, this);
092    rsServices.getChoreService().scheduleChore(refreshChore);
093  }
094
095  @Override
096  public void stop(final String why) {
097    if (refreshChore != null) {
098      LOG.debug("Stopping QuotaRefresherChore chore.");
099      refreshChore.shutdown(true);
100    }
101    stopped = true;
102  }
103
104  @Override
105  public boolean isStopped() {
106    return stopped;
107  }
108
109  /**
110   * Returns the limiter associated to the specified user/table.
111   * @param ugi   the user to limit
112   * @param table the table to limit
113   * @return the limiter associated to the specified user/table
114   */
115  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
116    if (table.isSystemTable()) {
117      return NoopQuotaLimiter.get();
118    }
119    return getUserQuotaState(ugi).getTableLimiter(table);
120  }
121
122  /**
123   * Returns the QuotaState associated to the specified user.
124   * @param ugi the user
125   * @return the quota info associated to specified user
126   */
127  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
128    return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
129      this::triggerCacheRefresh);
130  }
131
132  /**
133   * Returns the limiter associated to the specified table.
134   * @param table the table to limit
135   * @return the limiter associated to the specified table
136   */
137  public QuotaLimiter getTableLimiter(final TableName table) {
138    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
139  }
140
141  /**
142   * Returns the limiter associated to the specified namespace.
143   * @param namespace the namespace to limit
144   * @return the limiter associated to the specified namespace
145   */
146  public QuotaLimiter getNamespaceLimiter(final String namespace) {
147    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
148  }
149
150  /**
151   * Returns the limiter associated to the specified region server.
152   * @param regionServer the region server to limit
153   * @return the limiter associated to the specified region server
154   */
155  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
156    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
157  }
158
159  protected boolean isExceedThrottleQuotaEnabled() {
160    return exceedThrottleQuotaEnabled;
161  }
162
163  /**
164   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
165   * returned and the quota request will be enqueued for the next cache refresh.
166   */
167  private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
168    return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
169  }
170
171  void triggerCacheRefresh() {
172    refreshChore.triggerNow();
173  }
174
175  long getLastUpdate() {
176    return refreshChore.lastUpdate;
177  }
178
179  Map<String, QuotaState> getNamespaceQuotaCache() {
180    return namespaceQuotaCache;
181  }
182
183  Map<String, QuotaState> getRegionServerQuotaCache() {
184    return regionServerQuotaCache;
185  }
186
187  Map<TableName, QuotaState> getTableQuotaCache() {
188    return tableQuotaCache;
189  }
190
191  Map<String, UserQuotaState> getUserQuotaCache() {
192    return userQuotaCache;
193  }
194
195  // TODO: Remove this once we have the notification bus
196  private class QuotaRefresherChore extends ScheduledChore {
197    private long lastUpdate = 0;
198
199    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
200      super("QuotaRefresherChore", stoppable, period);
201    }
202
203    @Override
204    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
205        justification = "I do not understand why the complaints, it looks good to me -- FIX")
206    protected void chore() {
207      // Prefetch online tables/namespaces
208      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
209        if (table.isSystemTable()) {
210          continue;
211        }
212        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
213
214        final String ns = table.getNamespaceAsString();
215
216        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
217      }
218
219      QuotaCache.this.regionServerQuotaCache
220        .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
221
222      updateQuotaFactors();
223      fetchNamespaceQuotaState();
224      fetchTableQuotaState();
225      fetchUserQuotaState();
226      fetchRegionServerQuotaState();
227      fetchExceedThrottleQuota();
228      lastUpdate = EnvironmentEdgeManager.currentTime();
229    }
230
231    private void fetchNamespaceQuotaState() {
232      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
233        @Override
234        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
235          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
236        }
237
238        @Override
239        public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
240          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
241            machineQuotaFactor);
242        }
243      });
244    }
245
246    private void fetchTableQuotaState() {
247      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
248        @Override
249        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
250          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
251        }
252
253        @Override
254        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
255          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
256            tableMachineQuotaFactors);
257        }
258      });
259    }
260
261    private void fetchUserQuotaState() {
262      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
263      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
264      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
265        @Override
266        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
267          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
268        }
269
270        @Override
271        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
272          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
273            tableMachineQuotaFactors, machineQuotaFactor);
274        }
275      });
276    }
277
278    private void fetchRegionServerQuotaState() {
279      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
280        new Fetcher<String, QuotaState>() {
281          @Override
282          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
283            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
284          }
285
286          @Override
287          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
288            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
289          }
290        });
291    }
292
293    private void fetchExceedThrottleQuota() {
294      try {
295        QuotaCache.this.exceedThrottleQuotaEnabled =
296          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
297      } catch (IOException e) {
298        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
299      }
300    }
301
302    private <K, V extends QuotaState> void fetch(final String type,
303      final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
304      long now = EnvironmentEdgeManager.currentTime();
305      long refreshPeriod = getPeriod();
306      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
307
308      // Find the quota entries to update
309      List<Get> gets = new ArrayList<>();
310      List<K> toRemove = new ArrayList<>();
311      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
312        long lastUpdate = entry.getValue().getLastUpdate();
313        long lastQuery = entry.getValue().getLastQuery();
314        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
315          toRemove.add(entry.getKey());
316        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
317          gets.add(fetcher.makeGet(entry));
318        }
319      }
320
321      for (final K key : toRemove) {
322        if (LOG.isTraceEnabled()) {
323          LOG.trace("evict " + type + " key=" + key);
324        }
325        quotasMap.remove(key);
326      }
327
328      // fetch and update the quota entries
329      if (!gets.isEmpty()) {
330        try {
331          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
332            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
333            if (quotaInfo != null) {
334              quotaInfo.update(entry.getValue());
335            }
336
337            if (LOG.isTraceEnabled()) {
338              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
339            }
340          }
341        } catch (IOException e) {
342          LOG.warn("Unable to read " + type + " from quota table", e);
343        }
344      }
345    }
346
347    /**
348     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
349     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
350     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
351     */
352    private void updateQuotaFactors() {
353      // Update machine quota factor
354      ClusterMetrics clusterMetrics;
355      try {
356        clusterMetrics = rsServices.getConnection().getAdmin()
357          .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT));
358      } catch (IOException e) {
359        LOG.warn("Failed to get cluster metrics needed for updating quotas", e);
360        return;
361      }
362
363      int rsSize = clusterMetrics.getServersName().size();
364      if (rsSize != 0) {
365        // TODO if use rs group, the cluster limit should be shared by the rs group
366        machineQuotaFactor = 1.0 / rsSize;
367      }
368
369      Map<TableName, RegionStatesCount> tableRegionStatesCount =
370        clusterMetrics.getTableRegionStatesCount();
371
372      // Update table machine quota factors
373      for (TableName tableName : tableQuotaCache.keySet()) {
374        double factor = 1;
375        try {
376          long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
377          if (regionSize == 0) {
378            factor = 0;
379          } else {
380            int localRegionSize = rsServices.getRegions(tableName).size();
381            factor = 1.0 * localRegionSize / regionSize;
382          }
383        } catch (IOException e) {
384          LOG.warn("Get table regions failed: {}", tableName, e);
385        }
386        tableMachineQuotaFactors.put(tableName, factor);
387      }
388    }
389  }
390
391  static interface Fetcher<Key, Value> {
392    Get makeGet(Map.Entry<Key, Value> entry);
393
394    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
395  }
396}