001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.EnumSet;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Optional;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterMetrics;
030import org.apache.hadoop.hbase.ClusterMetrics.Option;
031import org.apache.hadoop.hbase.ScheduledChore;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionStatesCount;
035import org.apache.hadoop.hbase.ipc.RpcCall;
036import org.apache.hadoop.hbase.ipc.RpcServer;
037import org.apache.hadoop.hbase.regionserver.RegionServerServices;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.security.UserGroupInformation;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
046import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
047import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
048
049/**
050 * Cache that keeps track of the quota settings for the users and tables that are interacting with
051 * it.
052 */
053@InterfaceAudience.Private
054@InterfaceStability.Evolving
055public class QuotaCache implements Stoppable {
056  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
057
058  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
059  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
060    "hbase.quota.cache.ttl.region.states.ms";
061  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
062    "hbase.quota.cache.ttl.servers.size.ms";
063
064  // defines the request attribute key which, when provided, will override the request's username
065  // from the perspective of user quotas
066  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
067    "hbase.quota.user.override.key";
068  private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours
069
070  private final Object initializerLock = new Object();
071  private volatile boolean initialized = false;
072
073  private volatile Map<String, QuotaState> namespaceQuotaCache = new HashMap<>();
074  private volatile Map<TableName, QuotaState> tableQuotaCache = new HashMap<>();
075  private volatile Map<String, UserQuotaState> userQuotaCache = new HashMap<>();
076  private volatile Map<String, QuotaState> regionServerQuotaCache = new HashMap<>();
077
078  private volatile boolean exceedThrottleQuotaEnabled = false;
079  // factors used to divide cluster scope quota into machine scope quota
080  private volatile double machineQuotaFactor = 1;
081  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
082    new ConcurrentHashMap<>();
083  private final RegionServerServices rsServices;
084  private final String userOverrideRequestAttributeKey;
085
086  private QuotaRefresherChore refreshChore;
087  private boolean stopped = true;
088
089  public QuotaCache(final RegionServerServices rsServices) {
090    this.rsServices = rsServices;
091    this.userOverrideRequestAttributeKey =
092      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
093  }
094
095  public void start() throws IOException {
096    stopped = false;
097
098    Configuration conf = rsServices.getConfiguration();
099    // Refresh the cache every 12 hours, and every time a quota is changed, and every time a
100    // configuration reload is triggered. Periodic reloads are kept to a minimum to avoid
101    // flooding the RegionServer holding the hbase:quota table with requests.
102    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
103    refreshChore = new QuotaRefresherChore(conf, period, this);
104    rsServices.getChoreService().scheduleChore(refreshChore);
105  }
106
107  @Override
108  public void stop(final String why) {
109    if (refreshChore != null) {
110      LOG.debug("Stopping QuotaRefresherChore chore.");
111      refreshChore.shutdown(true);
112    }
113    stopped = true;
114  }
115
116  @Override
117  public boolean isStopped() {
118    return stopped;
119  }
120
121  private void ensureInitialized() {
122    if (!initialized) {
123      synchronized (initializerLock) {
124        if (!initialized) {
125          refreshChore.chore();
126          initialized = true;
127        }
128      }
129    }
130  }
131
132  private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException {
133    return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), tableMachineQuotaFactors,
134      machineQuotaFactor);
135  }
136
137  private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException {
138    return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection());
139  }
140
141  private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException {
142    return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), tableMachineQuotaFactors);
143  }
144
145  private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException {
146    return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), machineQuotaFactor);
147  }
148
149  /**
150   * Returns the limiter associated to the specified user/table.
151   * @param ugi   the user to limit
152   * @param table the table to limit
153   * @return the limiter associated to the specified user/table
154   */
155  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
156    if (table.isSystemTable()) {
157      return NoopQuotaLimiter.get();
158    }
159    return getUserQuotaState(ugi).getTableLimiter(table);
160  }
161
162  /**
163   * Returns the QuotaState associated to the specified user.
164   * @param ugi the user
165   * @return the quota info associated to specified user
166   */
167  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
168    String user = getQuotaUserName(ugi);
169    ensureInitialized();
170    // local reference because the chore thread may assign to userQuotaCache
171    Map<String, UserQuotaState> cache = userQuotaCache;
172    if (!cache.containsKey(user)) {
173      cache.put(user, QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration()));
174    }
175    return cache.get(user);
176  }
177
178  /**
179   * Returns the limiter associated to the specified table.
180   * @param table the table to limit
181   * @return the limiter associated to the specified table
182   */
183  public QuotaLimiter getTableLimiter(final TableName table) {
184    ensureInitialized();
185    // local reference because the chore thread may assign to tableQuotaCache
186    Map<TableName, QuotaState> cache = tableQuotaCache;
187    if (!cache.containsKey(table)) {
188      cache.put(table, new QuotaState());
189    }
190    return cache.get(table).getGlobalLimiter();
191  }
192
193  /**
194   * Returns the limiter associated to the specified namespace.
195   * @param namespace the namespace to limit
196   * @return the limiter associated to the specified namespace
197   */
198  public QuotaLimiter getNamespaceLimiter(final String namespace) {
199    ensureInitialized();
200    // local reference because the chore thread may assign to namespaceQuotaCache
201    Map<String, QuotaState> cache = namespaceQuotaCache;
202    if (!cache.containsKey(namespace)) {
203      cache.put(namespace, new QuotaState());
204    }
205    return cache.get(namespace).getGlobalLimiter();
206  }
207
208  /**
209   * Returns the limiter associated to the specified region server.
210   * @param regionServer the region server to limit
211   * @return the limiter associated to the specified region server
212   */
213  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
214    ensureInitialized();
215    // local reference because the chore thread may assign to regionServerQuotaCache
216    Map<String, QuotaState> cache = regionServerQuotaCache;
217    if (!cache.containsKey(regionServer)) {
218      cache.put(regionServer, new QuotaState());
219    }
220    return cache.get(regionServer).getGlobalLimiter();
221  }
222
223  protected boolean isExceedThrottleQuotaEnabled() {
224    return exceedThrottleQuotaEnabled;
225  }
226
227  /**
228   * Applies a request attribute user override if available, otherwise returns the UGI's short
229   * username
230   * @param ugi The request's UserGroupInformation
231   */
232  private String getQuotaUserName(final UserGroupInformation ugi) {
233    if (userOverrideRequestAttributeKey == null) {
234      return ugi.getShortUserName();
235    }
236
237    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
238    if (!rpcCall.isPresent()) {
239      return ugi.getShortUserName();
240    }
241
242    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
243    if (override == null) {
244      return ugi.getShortUserName();
245    }
246    return Bytes.toString(override);
247  }
248
249  void triggerCacheRefresh() {
250    refreshChore.triggerNow();
251  }
252
253  void forceSynchronousCacheRefresh() {
254    refreshChore.chore();
255  }
256
257  /** visible for testing */
258  Map<String, QuotaState> getNamespaceQuotaCache() {
259    return namespaceQuotaCache;
260  }
261
262  /** visible for testing */
263  Map<String, QuotaState> getRegionServerQuotaCache() {
264    return regionServerQuotaCache;
265  }
266
267  /** visible for testing */
268  Map<TableName, QuotaState> getTableQuotaCache() {
269    return tableQuotaCache;
270  }
271
272  /** visible for testing */
273  Map<String, UserQuotaState> getUserQuotaCache() {
274    return userQuotaCache;
275  }
276
277  // TODO: Remove this once we have the notification bus
278  private class QuotaRefresherChore extends ScheduledChore {
279    // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
280    // So we cache the results to reduce that load.
281    private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
282    private final RefreshableExpiringValueCache<Integer> regionServersSize;
283
284    public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
285      super("QuotaRefresherChore", stoppable, period);
286
287      Duration tableRegionStatesCacheTtl =
288        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
289      this.tableRegionStatesClusterMetrics =
290        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
291          tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
292            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));
293
294      Duration regionServersSizeCacheTtl =
295        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
296      regionServersSize =
297        new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
298          () -> rsServices.getConnection().getAdmin().getRegionServers().size());
299    }
300
301    @Override
302    public synchronized boolean triggerNow() {
303      tableRegionStatesClusterMetrics.invalidate();
304      regionServersSize.invalidate();
305      return super.triggerNow();
306    }
307
308    @Override
309    protected void chore() {
310      updateQuotaFactors();
311
312      try {
313        Map<String, UserQuotaState> newUserQuotaCache = new HashMap<>(fetchUserQuotaStateEntries());
314        updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
315        userQuotaCache = newUserQuotaCache;
316      } catch (IOException e) {
317        LOG.error("Error while fetching user quotas", e);
318      }
319
320      try {
321        Map<String, QuotaState> newRegionServerQuotaCache =
322          new HashMap<>(fetchRegionServerQuotaStateEntries());
323        updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache);
324        regionServerQuotaCache = newRegionServerQuotaCache;
325      } catch (IOException e) {
326        LOG.error("Error while fetching region server quotas", e);
327      }
328
329      try {
330        Map<TableName, QuotaState> newTableQuotaCache =
331          new HashMap<>(fetchTableQuotaStateEntries());
332        updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
333        tableQuotaCache = newTableQuotaCache;
334      } catch (IOException e) {
335        LOG.error("Error while refreshing table quotas", e);
336      }
337
338      try {
339        Map<String, QuotaState> newNamespaceQuotaCache =
340          new HashMap<>(fetchNamespaceQuotaStateEntries());
341        updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
342        namespaceQuotaCache = newNamespaceQuotaCache;
343      } catch (IOException e) {
344        LOG.error("Error while refreshing namespace quotas", e);
345      }
346
347      fetchExceedThrottleQuota();
348    }
349
350    private void fetchExceedThrottleQuota() {
351      try {
352        QuotaCache.this.exceedThrottleQuotaEnabled =
353          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
354      } catch (IOException e) {
355        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
356      }
357    }
358
359    /**
360     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
361     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
362     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
363     */
364    private void updateQuotaFactors() {
365      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
366        || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
367      if (hasTableQuotas) {
368        updateTableMachineQuotaFactors();
369      } else {
370        updateOnlyMachineQuotaFactors();
371      }
372    }
373
374    /**
375     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
376     * we don't have any table quotas in the cache.
377     */
378    private void updateOnlyMachineQuotaFactors() {
379      Optional<Integer> rsSize = regionServersSize.get();
380      if (rsSize.isPresent()) {
381        updateMachineQuotaFactors(rsSize.get());
382      } else {
383        regionServersSize.refresh();
384      }
385    }
386
387    /**
388     * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
389     * factors as well. This relies on a more expensive query for ClusterMetrics.
390     */
391    private void updateTableMachineQuotaFactors() {
392      Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
393      if (!clusterMetricsMaybe.isPresent()) {
394        tableRegionStatesClusterMetrics.refresh();
395        return;
396      }
397      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
398      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
399
400      Map<TableName, RegionStatesCount> tableRegionStatesCount =
401        clusterMetrics.getTableRegionStatesCount();
402
403      // Update table machine quota factors
404      for (TableName tableName : tableQuotaCache.keySet()) {
405        if (tableRegionStatesCount.containsKey(tableName)) {
406          double factor = 1;
407          try {
408            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
409            if (regionSize == 0) {
410              factor = 0;
411            } else {
412              int localRegionSize = rsServices.getRegions(tableName).size();
413              factor = 1.0 * localRegionSize / regionSize;
414            }
415          } catch (IOException e) {
416            LOG.warn("Get table regions failed: {}", tableName, e);
417          }
418          tableMachineQuotaFactors.put(tableName, factor);
419        } else {
420          // TableName might have already been dropped (outdated)
421          tableMachineQuotaFactors.remove(tableName);
422        }
423      }
424    }
425
426    private void updateMachineQuotaFactors(int rsSize) {
427      if (rsSize != 0) {
428        // TODO if use rs group, the cluster limit should be shared by the rs group
429        machineQuotaFactor = 1.0 / rsSize;
430      }
431    }
432  }
433
434  /** visible for testing */
435  static <K, V extends QuotaState> void updateNewCacheFromOld(Map<K, V> oldCache,
436    Map<K, V> newCache) {
437    for (Map.Entry<K, V> entry : oldCache.entrySet()) {
438      K key = entry.getKey();
439      if (newCache.containsKey(key)) {
440        V newState = newCache.get(key);
441        V oldState = entry.getValue();
442        oldState.update(newState);
443        newCache.put(key, oldState);
444      }
445    }
446  }
447
448  static class RefreshableExpiringValueCache<T> {
449    private final String name;
450    private final LoadingCache<String, Optional<T>> cache;
451
452    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
453      ThrowingSupplier<T> supplier) {
454      this.name = name;
455      this.cache =
456        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
457          .build(new CacheLoader<>() {
458            @Override
459            public Optional<T> load(String key) {
460              try {
461                return Optional.of(supplier.get());
462              } catch (Exception e) {
463                LOG.warn("Failed to refresh cache {}", name, e);
464                return Optional.empty();
465              }
466            }
467          });
468    }
469
470    Optional<T> get() {
471      return cache.getUnchecked(name);
472    }
473
474    void refresh() {
475      cache.refresh(name);
476    }
477
478    void invalidate() {
479      cache.invalidate(name);
480    }
481  }
482
483  @FunctionalInterface
484  static interface ThrowingSupplier<T> {
485    T get() throws Exception;
486  }
487
488}