001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.MetaTableAccessor;
034import org.apache.hadoop.hbase.ScheduledChore;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.regionserver.RegionServerServices;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.security.UserGroupInformation;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Cache that keeps track of the quota settings for the users and tables that
049 * are interacting with it.
050 *
051 * To avoid blocking the operations if the requested quota is not in cache
052 * an "empty quota" will be returned and the request to fetch the quota information
053 * will be enqueued for the next refresh.
054 *
055 * TODO: At the moment the Cache has a Chore that will be triggered every 5min
056 * or on cache-miss events. Later the Quotas will be pushed using the notification system.
057 */
058@InterfaceAudience.Private
059@InterfaceStability.Evolving
060public class QuotaCache implements Stoppable {
061  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
062
063  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
064  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
065  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
066
067  // for testing purpose only, enforce the cache to be always refreshed
068  static boolean TEST_FORCE_REFRESH = false;
069
070  private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
071  private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
072  private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
073  private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
074      new ConcurrentHashMap<>();
075  private volatile boolean exceedThrottleQuotaEnabled = false;
076  // factors used to divide cluster scope quota into machine scope quota
077  private volatile double machineQuotaFactor = 1;
078  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
079      new ConcurrentHashMap<>();
080  private final RegionServerServices rsServices;
081
082  private QuotaRefresherChore refreshChore;
083  private boolean stopped = true;
084
085  public QuotaCache(final RegionServerServices rsServices) {
086    this.rsServices = rsServices;
087  }
088
089  public void start() throws IOException {
090    stopped = false;
091
092    // TODO: This will be replaced once we have the notification bus ready.
093    Configuration conf = rsServices.getConfiguration();
094    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
095    refreshChore = new QuotaRefresherChore(period, this);
096    rsServices.getChoreService().scheduleChore(refreshChore);
097  }
098
099  @Override
100  public void stop(final String why) {
101    if (refreshChore != null) {
102      LOG.debug("Stopping QuotaRefresherChore chore.");
103      refreshChore.cancel(true);
104    }
105    stopped = true;
106  }
107
108  @Override
109  public boolean isStopped() {
110    return stopped;
111  }
112
113  /**
114   * Returns the limiter associated to the specified user/table.
115   *
116   * @param ugi the user to limit
117   * @param table the table to limit
118   * @return the limiter associated to the specified user/table
119   */
120  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
121    if (table.isSystemTable()) {
122      return NoopQuotaLimiter.get();
123    }
124    return getUserQuotaState(ugi).getTableLimiter(table);
125  }
126
127  /**
128   * Returns the QuotaState associated to the specified user.
129   * @param ugi the user
130   * @return the quota info associated to specified user
131   */
132  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
133    return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
134      this::triggerCacheRefresh);
135  }
136
137  /**
138   * Returns the limiter associated to the specified table.
139   *
140   * @param table the table to limit
141   * @return the limiter associated to the specified table
142   */
143  public QuotaLimiter getTableLimiter(final TableName table) {
144    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
145  }
146
147  /**
148   * Returns the limiter associated to the specified namespace.
149   *
150   * @param namespace the namespace to limit
151   * @return the limiter associated to the specified namespace
152   */
153  public QuotaLimiter getNamespaceLimiter(final String namespace) {
154    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
155  }
156
157  /**
158   * Returns the limiter associated to the specified region server.
159   *
160   * @param regionServer the region server to limit
161   * @return the limiter associated to the specified region server
162   */
163  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
164    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
165  }
166
167  protected boolean isExceedThrottleQuotaEnabled() {
168    return exceedThrottleQuotaEnabled;
169  }
170
171  /**
172   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
173   * returned and the quota request will be enqueued for the next cache refresh.
174   */
175  private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
176      final K key) {
177    return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
178  }
179
180  void triggerCacheRefresh() {
181    refreshChore.triggerNow();
182  }
183
184  long getLastUpdate() {
185    return refreshChore.lastUpdate;
186  }
187
188  Map<String, QuotaState> getNamespaceQuotaCache() {
189    return namespaceQuotaCache;
190  }
191
192  Map<String, QuotaState> getRegionServerQuotaCache() {
193    return regionServerQuotaCache;
194  }
195
196  Map<TableName, QuotaState> getTableQuotaCache() {
197    return tableQuotaCache;
198  }
199
200  Map<String, UserQuotaState> getUserQuotaCache() {
201    return userQuotaCache;
202  }
203
204  // TODO: Remove this once we have the notification bus
205  private class QuotaRefresherChore extends ScheduledChore {
206    private long lastUpdate = 0;
207
208    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
209      super("QuotaRefresherChore", stoppable, period);
210    }
211
212    @Override
213    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES",
214      justification="I do not understand why the complaints, it looks good to me -- FIX")
215    protected void chore() {
216      // Prefetch online tables/namespaces
217      for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
218        if (table.isSystemTable()) continue;
219        if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
220          QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
221        }
222        String ns = table.getNamespaceAsString();
223        if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
224          QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
225        }
226      }
227      QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
228        new QuotaState());
229
230      updateQuotaFactors();
231      fetchNamespaceQuotaState();
232      fetchTableQuotaState();
233      fetchUserQuotaState();
234      fetchRegionServerQuotaState();
235      fetchExceedThrottleQuota();
236      lastUpdate = EnvironmentEdgeManager.currentTime();
237    }
238
239    private void fetchNamespaceQuotaState() {
240      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
241        @Override
242        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
243          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
244        }
245
246        @Override
247        public Map<String, QuotaState> fetchEntries(final List<Get> gets)
248            throws IOException {
249          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
250            machineQuotaFactor);
251        }
252      });
253    }
254
255    private void fetchTableQuotaState() {
256      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
257        @Override
258        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
259          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
260        }
261
262        @Override
263        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets)
264            throws IOException {
265          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
266            tableMachineQuotaFactors);
267        }
268      });
269    }
270
271    private void fetchUserQuotaState() {
272      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
273      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
274      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
275        @Override
276        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
277          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
278        }
279
280        @Override
281        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets)
282            throws IOException {
283          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
284            tableMachineQuotaFactors, machineQuotaFactor);
285        }
286      });
287    }
288
289    private void fetchRegionServerQuotaState() {
290      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
291        new Fetcher<String, QuotaState>() {
292          @Override
293          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
294            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
295          }
296
297          @Override
298          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
299            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
300          }
301        });
302    }
303
304    private void fetchExceedThrottleQuota() {
305      try {
306        QuotaCache.this.exceedThrottleQuotaEnabled =
307            QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
308      } catch (IOException e) {
309        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
310      }
311    }
312
313    private <K, V extends QuotaState> void fetch(final String type,
314        final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
315      long now = EnvironmentEdgeManager.currentTime();
316      long refreshPeriod = getPeriod();
317      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
318
319      // Find the quota entries to update
320      List<Get> gets = new ArrayList<>();
321      List<K> toRemove = new ArrayList<>();
322      for (Map.Entry<K, V> entry: quotasMap.entrySet()) {
323        long lastUpdate = entry.getValue().getLastUpdate();
324        long lastQuery = entry.getValue().getLastQuery();
325        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
326          toRemove.add(entry.getKey());
327        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
328          gets.add(fetcher.makeGet(entry));
329        }
330      }
331
332      for (final K key: toRemove) {
333        if (LOG.isTraceEnabled()) {
334          LOG.trace("evict " + type + " key=" + key);
335        }
336        quotasMap.remove(key);
337      }
338
339      // fetch and update the quota entries
340      if (!gets.isEmpty()) {
341        try {
342          for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) {
343            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
344            if (quotaInfo != null) {
345              quotaInfo.update(entry.getValue());
346            }
347
348            if (LOG.isTraceEnabled()) {
349              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
350            }
351          }
352        } catch (IOException e) {
353          LOG.warn("Unable to read " + type + " from quota table", e);
354        }
355      }
356    }
357
358    /**
359     * Update quota factors which is used to divide cluster scope quota into machine scope quota
360     *
361     * For user/namespace/user over namespace quota, use [1 / RSNum] as machine factor.
362     * For table/user over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum]
363     * as machine factor.
364     */
365    private void updateQuotaFactors() {
366      // Update machine quota factor
367      try {
368        int rsSize = rsServices.getConnection().getAdmin()
369            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)).getServersName().size();
370        if (rsSize != 0) {
371          // TODO if use rs group, the cluster limit should be shared by the rs group
372          machineQuotaFactor = 1.0 / rsSize;
373        }
374      } catch (IOException e) {
375        LOG.warn("Get live region servers failed", e);
376      }
377
378      // Update table machine quota factors
379      for (TableName tableName : tableQuotaCache.keySet()) {
380        double factor = 1;
381        try {
382          long regionSize =
383              MetaTableAccessor.getTableRegions(rsServices.getConnection(), tableName, true)
384                  .stream().filter(regionInfo -> !regionInfo.isOffline()).count();
385          if (regionSize == 0) {
386            factor = 0;
387          } else {
388            int localRegionSize = rsServices.getRegions(tableName).size();
389            factor = 1.0 * localRegionSize / regionSize;
390          }
391        } catch (IOException e) {
392          LOG.warn("Get table regions failed: {}", tableName, e);
393        }
394        tableMachineQuotaFactors.put(tableName, factor);
395      }
396    }
397  }
398
399  static interface Fetcher<Key, Value> {
400    Get makeGet(Map.Entry<Key, Value> entry);
401    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
402  }
403}