001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.EnumSet;
023import java.util.Map;
024import java.util.Optional;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ClusterMetrics;
029import org.apache.hadoop.hbase.ClusterMetrics.Option;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.RegionStatesCount;
034import org.apache.hadoop.hbase.ipc.RpcCall;
035import org.apache.hadoop.hbase.ipc.RpcServer;
036import org.apache.hadoop.hbase.regionserver.RegionServerServices;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.security.UserGroupInformation;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
045import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
046import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
047
048/**
049 * Cache that keeps track of the quota settings for the users and tables that are interacting with
050 * it.
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054public class QuotaCache implements Stoppable {
055  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
056
057  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
058  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
059    "hbase.quota.cache.ttl.region.states.ms";
060  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
061    "hbase.quota.cache.ttl.servers.size.ms";
062
063  // defines the request attribute key which, when provided, will override the request's username
064  // from the perspective of user quotas
065  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
066    "hbase.quota.user.override.key";
067  private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours
068
069  private final Object initializerLock = new Object();
070  private volatile boolean initialized = false;
071
072  private volatile Map<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
073  private volatile Map<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
074  private volatile Map<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
075  private volatile Map<String, QuotaState> regionServerQuotaCache = new ConcurrentHashMap<>();
076
077  private volatile boolean exceedThrottleQuotaEnabled = false;
078  // factors used to divide cluster scope quota into machine scope quota
079  private volatile double machineQuotaFactor = 1;
080  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
081    new ConcurrentHashMap<>();
082  private final RegionServerServices rsServices;
083  private final String userOverrideRequestAttributeKey;
084
085  private QuotaRefresherChore refreshChore;
086  private boolean stopped = true;
087
088  public QuotaCache(final RegionServerServices rsServices) {
089    this.rsServices = rsServices;
090    this.userOverrideRequestAttributeKey =
091      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
092  }
093
094  public void start() throws IOException {
095    stopped = false;
096
097    Configuration conf = rsServices.getConfiguration();
098    // Refresh the cache every 12 hours, and every time a quota is changed, and every time a
099    // configuration reload is triggered. Periodic reloads are kept to a minimum to avoid
100    // flooding the RegionServer holding the hbase:quota table with requests.
101    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
102    refreshChore = new QuotaRefresherChore(conf, period, this);
103    rsServices.getChoreService().scheduleChore(refreshChore);
104  }
105
106  @Override
107  public void stop(final String why) {
108    if (refreshChore != null) {
109      LOG.debug("Stopping QuotaRefresherChore chore.");
110      refreshChore.shutdown(true);
111    }
112    stopped = true;
113  }
114
115  @Override
116  public boolean isStopped() {
117    return stopped;
118  }
119
120  private void ensureInitialized() {
121    if (!initialized) {
122      synchronized (initializerLock) {
123        if (!initialized) {
124          refreshChore.chore();
125          initialized = true;
126        }
127      }
128    }
129  }
130
131  private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException {
132    return QuotaUtil.fetchUserQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
133      tableMachineQuotaFactors, machineQuotaFactor);
134  }
135
136  private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException {
137    return QuotaUtil.fetchRegionServerQuotas(rsServices.getConfiguration(),
138      rsServices.getConnection());
139  }
140
141  private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException {
142    return QuotaUtil.fetchTableQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
143      tableMachineQuotaFactors);
144  }
145
146  private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException {
147    return QuotaUtil.fetchNamespaceQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
148      machineQuotaFactor);
149  }
150
151  /**
152   * Returns the limiter associated to the specified user/table.
153   * @param ugi   the user to limit
154   * @param table the table to limit
155   * @return the limiter associated to the specified user/table
156   */
157  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
158    if (table.isSystemTable()) {
159      return NoopQuotaLimiter.get();
160    }
161    return getUserQuotaState(ugi).getTableLimiter(table);
162  }
163
164  /**
165   * Returns the QuotaState associated to the specified user.
166   * @param ugi the user
167   * @return the quota info associated to specified user
168   */
169  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
170    String user = getQuotaUserName(ugi);
171    ensureInitialized();
172    // local reference because the chore thread may assign to userQuotaCache
173    Map<String, UserQuotaState> cache = userQuotaCache;
174    if (!cache.containsKey(user)) {
175      cache.put(user, QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration()));
176    }
177    return cache.get(user);
178  }
179
180  /**
181   * Returns the limiter associated to the specified table.
182   * @param table the table to limit
183   * @return the limiter associated to the specified table
184   */
185  public QuotaLimiter getTableLimiter(final TableName table) {
186    ensureInitialized();
187    // local reference because the chore thread may assign to tableQuotaCache
188    Map<TableName, QuotaState> cache = tableQuotaCache;
189    if (!cache.containsKey(table)) {
190      cache.put(table, new QuotaState());
191    }
192    return cache.get(table).getGlobalLimiter();
193  }
194
195  /**
196   * Returns the limiter associated to the specified namespace.
197   * @param namespace the namespace to limit
198   * @return the limiter associated to the specified namespace
199   */
200  public QuotaLimiter getNamespaceLimiter(final String namespace) {
201    ensureInitialized();
202    // local reference because the chore thread may assign to namespaceQuotaCache
203    Map<String, QuotaState> cache = namespaceQuotaCache;
204    if (!cache.containsKey(namespace)) {
205      cache.put(namespace, new QuotaState());
206    }
207    return cache.get(namespace).getGlobalLimiter();
208  }
209
210  /**
211   * Returns the limiter associated to the specified region server.
212   * @param regionServer the region server to limit
213   * @return the limiter associated to the specified region server
214   */
215  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
216    ensureInitialized();
217    // local reference because the chore thread may assign to regionServerQuotaCache
218    Map<String, QuotaState> cache = regionServerQuotaCache;
219    if (!cache.containsKey(regionServer)) {
220      cache.put(regionServer, new QuotaState());
221    }
222    return cache.get(regionServer).getGlobalLimiter();
223  }
224
225  protected boolean isExceedThrottleQuotaEnabled() {
226    return exceedThrottleQuotaEnabled;
227  }
228
229  /**
230   * Applies a request attribute user override if available, otherwise returns the UGI's short
231   * username
232   * @param ugi The request's UserGroupInformation
233   */
234  String getQuotaUserName(final UserGroupInformation ugi) {
235    if (userOverrideRequestAttributeKey == null) {
236      return ugi.getShortUserName();
237    }
238
239    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
240    if (!rpcCall.isPresent()) {
241      return ugi.getShortUserName();
242    }
243
244    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
245    if (override == null) {
246      return ugi.getShortUserName();
247    }
248    return Bytes.toString(override);
249  }
250
251  void triggerCacheRefresh() {
252    refreshChore.triggerNow();
253  }
254
255  void forceSynchronousCacheRefresh() {
256    refreshChore.chore();
257  }
258
259  /** visible for testing */
260  Map<String, QuotaState> getNamespaceQuotaCache() {
261    return namespaceQuotaCache;
262  }
263
264  /** visible for testing */
265  Map<String, QuotaState> getRegionServerQuotaCache() {
266    return regionServerQuotaCache;
267  }
268
269  /** visible for testing */
270  Map<TableName, QuotaState> getTableQuotaCache() {
271    return tableQuotaCache;
272  }
273
274  /** visible for testing */
275  Map<String, UserQuotaState> getUserQuotaCache() {
276    return userQuotaCache;
277  }
278
279  // TODO: Remove this once we have the notification bus
280  private class QuotaRefresherChore extends ScheduledChore {
281    // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
282    // So we cache the results to reduce that load.
283    private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
284    private final RefreshableExpiringValueCache<Integer> regionServersSize;
285
286    public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
287      super("QuotaRefresherChore", stoppable, period);
288
289      Duration tableRegionStatesCacheTtl =
290        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
291      this.tableRegionStatesClusterMetrics =
292        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
293          tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
294            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));
295
296      Duration regionServersSizeCacheTtl =
297        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
298      regionServersSize =
299        new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
300          () -> rsServices.getConnection().getAdmin().getRegionServers().size());
301    }
302
303    @Override
304    public synchronized boolean triggerNow() {
305      tableRegionStatesClusterMetrics.invalidate();
306      regionServersSize.invalidate();
307      return super.triggerNow();
308    }
309
310    @Override
311    protected void chore() {
312      synchronized (this) {
313        LOG.info("Reloading quota cache from hbase:quota table");
314        updateQuotaFactors();
315
316        try {
317          Map<String, UserQuotaState> newUserQuotaCache =
318            new ConcurrentHashMap<>(fetchUserQuotaStateEntries());
319          updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
320          userQuotaCache = newUserQuotaCache;
321        } catch (IOException e) {
322          LOG.error("Error while fetching user quotas", e);
323        }
324
325        try {
326          Map<String, QuotaState> newRegionServerQuotaCache =
327            new ConcurrentHashMap<>(fetchRegionServerQuotaStateEntries());
328          updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache);
329          regionServerQuotaCache = newRegionServerQuotaCache;
330        } catch (IOException e) {
331          LOG.error("Error while fetching region server quotas", e);
332        }
333
334        try {
335          Map<TableName, QuotaState> newTableQuotaCache =
336            new ConcurrentHashMap<>(fetchTableQuotaStateEntries());
337          updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
338          tableQuotaCache = newTableQuotaCache;
339        } catch (IOException e) {
340          LOG.error("Error while refreshing table quotas", e);
341        }
342
343        try {
344          Map<String, QuotaState> newNamespaceQuotaCache =
345            new ConcurrentHashMap<>(fetchNamespaceQuotaStateEntries());
346          updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
347          namespaceQuotaCache = newNamespaceQuotaCache;
348        } catch (IOException e) {
349          LOG.error("Error while refreshing namespace quotas", e);
350        }
351
352        fetchExceedThrottleQuota();
353      }
354    }
355
356    private void fetchExceedThrottleQuota() {
357      try {
358        QuotaCache.this.exceedThrottleQuotaEnabled =
359          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
360      } catch (IOException e) {
361        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
362      }
363    }
364
365    /**
366     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
367     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
368     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
369     */
370    private void updateQuotaFactors() {
371      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
372        || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
373      if (hasTableQuotas) {
374        updateTableMachineQuotaFactors();
375      } else {
376        updateOnlyMachineQuotaFactors();
377      }
378    }
379
380    /**
381     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
382     * we don't have any table quotas in the cache.
383     */
384    private void updateOnlyMachineQuotaFactors() {
385      Optional<Integer> rsSize = regionServersSize.get();
386      if (rsSize.isPresent()) {
387        updateMachineQuotaFactors(rsSize.get());
388      } else {
389        regionServersSize.refresh();
390      }
391    }
392
393    /**
394     * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
395     * factors as well. This relies on a more expensive query for ClusterMetrics.
396     */
397    private void updateTableMachineQuotaFactors() {
398      Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
399      if (!clusterMetricsMaybe.isPresent()) {
400        tableRegionStatesClusterMetrics.refresh();
401        return;
402      }
403      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
404      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
405
406      Map<TableName, RegionStatesCount> tableRegionStatesCount =
407        clusterMetrics.getTableRegionStatesCount();
408
409      // Update table machine quota factors
410      for (TableName tableName : tableQuotaCache.keySet()) {
411        if (tableRegionStatesCount.containsKey(tableName)) {
412          double factor = 1;
413          try {
414            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
415            if (regionSize == 0) {
416              factor = 0;
417            } else {
418              int localRegionSize = rsServices.getRegions(tableName).size();
419              factor = 1.0 * localRegionSize / regionSize;
420            }
421          } catch (IOException e) {
422            LOG.warn("Get table regions failed: {}", tableName, e);
423          }
424          tableMachineQuotaFactors.put(tableName, factor);
425        } else {
426          // TableName might have already been dropped (outdated)
427          tableMachineQuotaFactors.remove(tableName);
428        }
429      }
430    }
431
432    private void updateMachineQuotaFactors(int rsSize) {
433      if (rsSize != 0) {
434        // TODO if use rs group, the cluster limit should be shared by the rs group
435        machineQuotaFactor = 1.0 / rsSize;
436      }
437    }
438  }
439
440  /** visible for testing */
441  static <K, V extends QuotaState> void updateNewCacheFromOld(Map<K, V> oldCache,
442    Map<K, V> newCache) {
443    for (Map.Entry<K, V> entry : oldCache.entrySet()) {
444      K key = entry.getKey();
445      if (newCache.containsKey(key)) {
446        V newState = newCache.get(key);
447        V oldState = entry.getValue();
448        oldState.update(newState);
449        newCache.put(key, oldState);
450      }
451    }
452  }
453
454  static class RefreshableExpiringValueCache<T> {
455    private final String name;
456    private final LoadingCache<String, Optional<T>> cache;
457
458    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
459      ThrowingSupplier<T> supplier) {
460      this.name = name;
461      this.cache =
462        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
463          .build(new CacheLoader<>() {
464            @Override
465            public Optional<T> load(String key) {
466              try {
467                return Optional.of(supplier.get());
468              } catch (Exception e) {
469                LOG.warn("Failed to refresh cache {}", name, e);
470                return Optional.empty();
471              }
472            }
473          });
474    }
475
476    Optional<T> get() {
477      return cache.getUnchecked(name);
478    }
479
480    void refresh() {
481      cache.refresh(name);
482    }
483
484    void invalidate() {
485      cache.invalidate(name);
486    }
487  }
488
489  @FunctionalInterface
490  static interface ThrowingSupplier<T> {
491    T get() throws Exception;
492  }
493
494}