001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.ArrayList;
023import java.util.EnumSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.TimeUnit;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.ClusterMetrics.Option;
035import org.apache.hadoop.hbase.ScheduledChore;
036import org.apache.hadoop.hbase.Stoppable;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.RegionStatesCount;
040import org.apache.hadoop.hbase.ipc.RpcCall;
041import org.apache.hadoop.hbase.ipc.RpcServer;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.RegionServerServices;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.security.UserGroupInformation;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.yetus.audience.InterfaceStability;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
053import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
054import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
055
056/**
057 * Cache that keeps track of the quota settings for the users and tables that are interacting with
058 * it.
059 */
060@InterfaceAudience.Private
061@InterfaceStability.Evolving
062public class QuotaCache implements Stoppable {
063  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
064
065  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
066  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
067    "hbase.quota.cache.ttl.region.states.ms";
068  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
069    "hbase.quota.cache.ttl.servers.size.ms";
070
071  // defines the request attribute key which, when provided, will override the request's username
072  // from the perspective of user quotas
073  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
074    "hbase.quota.user.override.key";
075  private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours
076  private static final int EVICT_PERIOD_FACTOR = 5;
077
078  // for testing purpose only, enforce the cache to be always refreshed
079  static boolean TEST_FORCE_REFRESH = false;
080  // for testing purpose only, block cache refreshes to reliably verify state
081  static boolean TEST_BLOCK_REFRESH = false;
082
083  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
084  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
085  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
086  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
087    new ConcurrentHashMap<>();
088  private volatile boolean exceedThrottleQuotaEnabled = false;
089  // factors used to divide cluster scope quota into machine scope quota
090  private volatile double machineQuotaFactor = 1;
091  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
092    new ConcurrentHashMap<>();
093  private final RegionServerServices rsServices;
094  private final String userOverrideRequestAttributeKey;
095
096  private QuotaRefresherChore refreshChore;
097  private boolean stopped = true;
098
099  private final Fetcher<String, UserQuotaState> userQuotaStateFetcher = new Fetcher<>() {
100    @Override
101    public Get makeGet(final String user) {
102      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
103      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
104      return QuotaUtil.makeGetForUserQuotas(user, tables, namespaces);
105    }
106
107    @Override
108    public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
109      return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, tableMachineQuotaFactors,
110        machineQuotaFactor);
111    }
112  };
113
114  private final Fetcher<String, QuotaState> regionServerQuotaStateFetcher = new Fetcher<>() {
115    @Override
116    public Get makeGet(final String regionServer) {
117      return QuotaUtil.makeGetForRegionServerQuotas(regionServer);
118    }
119
120    @Override
121    public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
122      return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
123    }
124  };
125
126  private final Fetcher<TableName, QuotaState> tableQuotaStateFetcher = new Fetcher<>() {
127    @Override
128    public Get makeGet(final TableName table) {
129      return QuotaUtil.makeGetForTableQuotas(table);
130    }
131
132    @Override
133    public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
134      return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, tableMachineQuotaFactors);
135    }
136  };
137
138  private final Fetcher<String, QuotaState> namespaceQuotaStateFetcher = new Fetcher<>() {
139    @Override
140    public Get makeGet(final String namespace) {
141      return QuotaUtil.makeGetForNamespaceQuotas(namespace);
142    }
143
144    @Override
145    public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
146      return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, machineQuotaFactor);
147    }
148  };
149
150  public QuotaCache(final RegionServerServices rsServices) {
151    this.rsServices = rsServices;
152    this.userOverrideRequestAttributeKey =
153      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
154  }
155
156  public void start() throws IOException {
157    stopped = false;
158
159    Configuration conf = rsServices.getConfiguration();
160    // Refresh the cache every 12 hours, and every time a quota is changed, and every time a
161    // configuration
162    // reload is triggered. Periodic reloads are kept to a minimum to avoid flooding the
163    // RegionServer
164    // holding the hbase:quota table with requests.
165    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
166    refreshChore = new QuotaRefresherChore(conf, period, this);
167    rsServices.getChoreService().scheduleChore(refreshChore);
168  }
169
170  @Override
171  public void stop(final String why) {
172    if (refreshChore != null) {
173      LOG.debug("Stopping QuotaRefresherChore chore.");
174      refreshChore.shutdown(true);
175    }
176    stopped = true;
177  }
178
179  @Override
180  public boolean isStopped() {
181    return stopped;
182  }
183
184  /**
185   * Returns the limiter associated to the specified user/table.
186   * @param ugi   the user to limit
187   * @param table the table to limit
188   * @return the limiter associated to the specified user/table
189   */
190  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
191    if (table.isSystemTable()) {
192      return NoopQuotaLimiter.get();
193    }
194    return getUserQuotaState(ugi).getTableLimiter(table);
195  }
196
197  /**
198   * Returns the QuotaState associated to the specified user.
199   * @param ugi the user
200   * @return the quota info associated to specified user
201   */
202  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
203    String user = getQuotaUserName(ugi);
204    if (!userQuotaCache.containsKey(user)) {
205      userQuotaCache.put(user,
206        QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
207      fetch("user", userQuotaCache, userQuotaStateFetcher);
208    }
209    return userQuotaCache.get(user);
210  }
211
212  /**
213   * Returns the limiter associated to the specified table.
214   * @param table the table to limit
215   * @return the limiter associated to the specified table
216   */
217  public QuotaLimiter getTableLimiter(final TableName table) {
218    if (!tableQuotaCache.containsKey(table)) {
219      tableQuotaCache.put(table, new QuotaState());
220      fetch("table", tableQuotaCache, tableQuotaStateFetcher);
221    }
222    return tableQuotaCache.get(table).getGlobalLimiter();
223  }
224
225  /**
226   * Returns the limiter associated to the specified namespace.
227   * @param namespace the namespace to limit
228   * @return the limiter associated to the specified namespace
229   */
230  public QuotaLimiter getNamespaceLimiter(final String namespace) {
231    if (!namespaceQuotaCache.containsKey(namespace)) {
232      namespaceQuotaCache.put(namespace, new QuotaState());
233      fetch("namespace", namespaceQuotaCache, namespaceQuotaStateFetcher);
234    }
235    return namespaceQuotaCache.get(namespace).getGlobalLimiter();
236  }
237
238  /**
239   * Returns the limiter associated to the specified region server.
240   * @param regionServer the region server to limit
241   * @return the limiter associated to the specified region server
242   */
243  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
244    if (!regionServerQuotaCache.containsKey(regionServer)) {
245      regionServerQuotaCache.put(regionServer, new QuotaState());
246      fetch("regionServer", regionServerQuotaCache, regionServerQuotaStateFetcher);
247    }
248    return regionServerQuotaCache.get(regionServer).getGlobalLimiter();
249  }
250
251  protected boolean isExceedThrottleQuotaEnabled() {
252    return exceedThrottleQuotaEnabled;
253  }
254
255  private <K, V extends QuotaState> void fetch(final String type, final Map<K, V> quotasMap,
256    final Fetcher<K, V> fetcher) {
257    // Find the quota entries to update
258    List<Get> gets = quotasMap.keySet().stream().map(fetcher::makeGet).collect(Collectors.toList());
259
260    // fetch and update the quota entries
261    if (!gets.isEmpty()) {
262      try {
263        for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
264          V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
265          if (quotaInfo != null) {
266            quotaInfo.update(entry.getValue());
267          }
268
269          if (LOG.isTraceEnabled()) {
270            LOG.trace("Loading {} key={} quotas={}", type, entry.getKey(), quotaInfo);
271          }
272        }
273      } catch (IOException e) {
274        LOG.warn("Unable to read {} from quota table", type, e);
275      }
276    }
277  }
278
279  /**
280   * Applies a request attribute user override if available, otherwise returns the UGI's short
281   * username
282   * @param ugi The request's UserGroupInformation
283   */
284  private String getQuotaUserName(final UserGroupInformation ugi) {
285    if (userOverrideRequestAttributeKey == null) {
286      return ugi.getShortUserName();
287    }
288
289    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
290    if (!rpcCall.isPresent()) {
291      return ugi.getShortUserName();
292    }
293
294    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
295    if (override == null) {
296      return ugi.getShortUserName();
297    }
298    return Bytes.toString(override);
299  }
300
301  void triggerCacheRefresh() {
302    refreshChore.triggerNow();
303  }
304
305  void forceSynchronousCacheRefresh() {
306    refreshChore.chore();
307  }
308
309  Map<String, QuotaState> getNamespaceQuotaCache() {
310    return namespaceQuotaCache;
311  }
312
313  Map<String, QuotaState> getRegionServerQuotaCache() {
314    return regionServerQuotaCache;
315  }
316
317  Map<TableName, QuotaState> getTableQuotaCache() {
318    return tableQuotaCache;
319  }
320
321  Map<String, UserQuotaState> getUserQuotaCache() {
322    return userQuotaCache;
323  }
324
325  // TODO: Remove this once we have the notification bus
326  private class QuotaRefresherChore extends ScheduledChore {
327    // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
328    // So we cache the results to reduce that load.
329    private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
330    private final RefreshableExpiringValueCache<Integer> regionServersSize;
331
332    public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
333      super("QuotaRefresherChore", stoppable, period);
334
335      Duration tableRegionStatesCacheTtl =
336        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
337      this.tableRegionStatesClusterMetrics =
338        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
339          tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
340            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));
341
342      Duration regionServersSizeCacheTtl =
343        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
344      regionServersSize =
345        new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
346          () -> rsServices.getConnection().getAdmin().getRegionServers().size());
347    }
348
349    @Override
350    public synchronized boolean triggerNow() {
351      tableRegionStatesClusterMetrics.invalidate();
352      regionServersSize.invalidate();
353      return super.triggerNow();
354    }
355
356    @Override
357    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
358        justification = "I do not understand why the complaints, it looks good to me -- FIX")
359    protected void chore() {
360      while (TEST_BLOCK_REFRESH) {
361        LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false");
362        try {
363          Thread.sleep(10);
364        } catch (InterruptedException e) {
365          throw new RuntimeException(e);
366        }
367      }
368      // Prefetch online tables/namespaces
369      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
370        if (table.isSystemTable()) {
371          continue;
372        }
373        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
374
375        final String ns = table.getNamespaceAsString();
376
377        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
378      }
379
380      QuotaCache.this.regionServerQuotaCache
381        .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
382
383      updateQuotaFactors();
384      fetchAndEvict("namespace", QuotaCache.this.namespaceQuotaCache, namespaceQuotaStateFetcher);
385      fetchAndEvict("table", QuotaCache.this.tableQuotaCache, tableQuotaStateFetcher);
386      fetchAndEvict("user", QuotaCache.this.userQuotaCache, userQuotaStateFetcher);
387      fetchAndEvict("regionServer", QuotaCache.this.regionServerQuotaCache,
388        regionServerQuotaStateFetcher);
389      fetchExceedThrottleQuota();
390    }
391
392    private void fetchExceedThrottleQuota() {
393      try {
394        QuotaCache.this.exceedThrottleQuotaEnabled =
395          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
396      } catch (IOException e) {
397        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
398      }
399    }
400
401    private <K, V extends QuotaState> void fetchAndEvict(final String type,
402      final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
403      long now = EnvironmentEdgeManager.currentTime();
404      long evictPeriod = getPeriod() * EVICT_PERIOD_FACTOR;
405      // Find the quota entries to update
406      List<Get> gets = new ArrayList<>();
407      List<K> toRemove = new ArrayList<>();
408      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
409        long lastQuery = entry.getValue().getLastQuery();
410        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
411          toRemove.add(entry.getKey());
412        } else {
413          gets.add(fetcher.makeGet(entry.getKey()));
414        }
415      }
416
417      for (final K key : toRemove) {
418        if (LOG.isTraceEnabled()) {
419          LOG.trace("evict " + type + " key=" + key);
420        }
421        quotasMap.remove(key);
422      }
423
424      // fetch and update the quota entries
425      if (!gets.isEmpty()) {
426        try {
427          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
428            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
429            if (quotaInfo != null) {
430              quotaInfo.update(entry.getValue());
431            }
432
433            if (LOG.isTraceEnabled()) {
434              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
435            }
436          }
437        } catch (IOException e) {
438          LOG.warn("Unable to read " + type + " from quota table", e);
439        }
440      }
441    }
442
443    /**
444     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
445     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
446     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
447     */
448    private void updateQuotaFactors() {
449      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
450        || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
451      if (hasTableQuotas) {
452        updateTableMachineQuotaFactors();
453      } else {
454        updateOnlyMachineQuotaFactors();
455      }
456    }
457
458    /**
459     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
460     * we don't have any table quotas in the cache.
461     */
462    private void updateOnlyMachineQuotaFactors() {
463      Optional<Integer> rsSize = regionServersSize.get();
464      if (rsSize.isPresent()) {
465        updateMachineQuotaFactors(rsSize.get());
466      } else {
467        regionServersSize.refresh();
468      }
469    }
470
471    /**
472     * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
473     * factors as well. This relies on a more expensive query for ClusterMetrics.
474     */
475    private void updateTableMachineQuotaFactors() {
476      Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
477      if (!clusterMetricsMaybe.isPresent()) {
478        tableRegionStatesClusterMetrics.refresh();
479        return;
480      }
481      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
482      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
483
484      Map<TableName, RegionStatesCount> tableRegionStatesCount =
485        clusterMetrics.getTableRegionStatesCount();
486
487      // Update table machine quota factors
488      for (TableName tableName : tableQuotaCache.keySet()) {
489        if (tableRegionStatesCount.containsKey(tableName)) {
490          double factor = 1;
491          try {
492            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
493            if (regionSize == 0) {
494              factor = 0;
495            } else {
496              int localRegionSize = rsServices.getRegions(tableName).size();
497              factor = 1.0 * localRegionSize / regionSize;
498            }
499          } catch (IOException e) {
500            LOG.warn("Get table regions failed: {}", tableName, e);
501          }
502          tableMachineQuotaFactors.put(tableName, factor);
503        } else {
504          // TableName might have already been dropped (outdated)
505          tableMachineQuotaFactors.remove(tableName);
506        }
507      }
508    }
509
510    private void updateMachineQuotaFactors(int rsSize) {
511      if (rsSize != 0) {
512        // TODO if use rs group, the cluster limit should be shared by the rs group
513        machineQuotaFactor = 1.0 / rsSize;
514      }
515    }
516  }
517
518  static class RefreshableExpiringValueCache<T> {
519    private final String name;
520    private final LoadingCache<String, Optional<T>> cache;
521
522    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
523      ThrowingSupplier<T> supplier) {
524      this.name = name;
525      this.cache =
526        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
527          .build(new CacheLoader<>() {
528            @Override
529            public Optional<T> load(String key) {
530              try {
531                return Optional.of(supplier.get());
532              } catch (Exception e) {
533                LOG.warn("Failed to refresh cache {}", name, e);
534                return Optional.empty();
535              }
536            }
537          });
538    }
539
540    Optional<T> get() {
541      return cache.getUnchecked(name);
542    }
543
544    void refresh() {
545      cache.refresh(name);
546    }
547
548    void invalidate() {
549      cache.invalidate(name);
550    }
551  }
552
553  @FunctionalInterface
554  static interface ThrowingSupplier<T> {
555    T get() throws Exception;
556  }
557
558  interface Fetcher<Key, Value> {
559    Get makeGet(Key key);
560
561    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
562  }
563}