001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.ClusterMetrics.Option;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.RegionStatesCount;
041import org.apache.hadoop.hbase.ipc.RpcCall;
042import org.apache.hadoop.hbase.ipc.RpcServer;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.regionserver.RegionServerServices;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
056
057/**
058 * Cache that keeps track of the quota settings for the users and tables that are interacting with
059 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
060 * be returned and the request to fetch the quota information will be enqueued for the next refresh.
061 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
062 * events. Later the Quotas will be pushed using the notification system.
063 */
064@InterfaceAudience.Private
065@InterfaceStability.Evolving
066public class QuotaCache implements Stoppable {
067  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
068
069  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
070  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
071    "hbase.quota.cache.ttl.region.states.ms";
072  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
073    "hbase.quota.cache.ttl.servers.size.ms";
074
075  // defines the request attribute key which, when provided, will override the request's username
076  // from the perspective of user quotas
077  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
078    "hbase.quota.user.override.key";
079  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
080  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
081
082  // for testing purpose only, enforce the cache to be always refreshed
083  static boolean TEST_FORCE_REFRESH = false;
084  // for testing purpose only, block cache refreshes to reliably verify state
085  static boolean TEST_BLOCK_REFRESH = false;
086
087  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
088  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
089  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
090  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
091    new ConcurrentHashMap<>();
092  private volatile boolean exceedThrottleQuotaEnabled = false;
093  // factors used to divide cluster scope quota into machine scope quota
094  private volatile double machineQuotaFactor = 1;
095  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
096    new ConcurrentHashMap<>();
097  private final RegionServerServices rsServices;
098  private final String userOverrideRequestAttributeKey;
099
100  private QuotaRefresherChore refreshChore;
101  private boolean stopped = true;
102
103  public QuotaCache(final RegionServerServices rsServices) {
104    this.rsServices = rsServices;
105    this.userOverrideRequestAttributeKey =
106      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
107  }
108
109  public void start() throws IOException {
110    stopped = false;
111
112    // TODO: This will be replaced once we have the notification bus ready.
113    Configuration conf = rsServices.getConfiguration();
114    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
115    refreshChore = new QuotaRefresherChore(conf, period, this);
116    rsServices.getChoreService().scheduleChore(refreshChore);
117  }
118
119  @Override
120  public void stop(final String why) {
121    if (refreshChore != null) {
122      LOG.debug("Stopping QuotaRefresherChore chore.");
123      refreshChore.shutdown(true);
124    }
125    stopped = true;
126  }
127
128  @Override
129  public boolean isStopped() {
130    return stopped;
131  }
132
133  /**
134   * Returns the limiter associated to the specified user/table.
135   * @param ugi   the user to limit
136   * @param table the table to limit
137   * @return the limiter associated to the specified user/table
138   */
139  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
140    if (table.isSystemTable()) {
141      return NoopQuotaLimiter.get();
142    }
143    return getUserQuotaState(ugi).getTableLimiter(table);
144  }
145
146  /**
147   * Returns the QuotaState associated to the specified user.
148   * @param ugi the user
149   * @return the quota info associated to specified user
150   */
151  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
152    return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
153      () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
154  }
155
156  /**
157   * Returns the limiter associated to the specified table.
158   * @param table the table to limit
159   * @return the limiter associated to the specified table
160   */
161  public QuotaLimiter getTableLimiter(final TableName table) {
162    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
163  }
164
165  /**
166   * Returns the limiter associated to the specified namespace.
167   * @param namespace the namespace to limit
168   * @return the limiter associated to the specified namespace
169   */
170  public QuotaLimiter getNamespaceLimiter(final String namespace) {
171    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
172  }
173
174  /**
175   * Returns the limiter associated to the specified region server.
176   * @param regionServer the region server to limit
177   * @return the limiter associated to the specified region server
178   */
179  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
180    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
181  }
182
183  protected boolean isExceedThrottleQuotaEnabled() {
184    return exceedThrottleQuotaEnabled;
185  }
186
187  /**
188   * Applies a request attribute user override if available, otherwise returns the UGI's short
189   * username
190   * @param ugi The request's UserGroupInformation
191   */
192  private String getQuotaUserName(final UserGroupInformation ugi) {
193    if (userOverrideRequestAttributeKey == null) {
194      return ugi.getShortUserName();
195    }
196
197    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
198    if (!rpcCall.isPresent()) {
199      return ugi.getShortUserName();
200    }
201
202    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
203    if (override == null) {
204      return ugi.getShortUserName();
205    }
206    return Bytes.toString(override);
207  }
208
209  /**
210   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
211   * returned and the quota request will be enqueued for the next cache refresh.
212   */
213  private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
214    return computeIfAbsent(quotasMap, key, QuotaState::new);
215  }
216
217  void triggerCacheRefresh() {
218    refreshChore.triggerNow();
219  }
220
221  long getLastUpdate() {
222    return refreshChore.lastUpdate;
223  }
224
225  Map<String, QuotaState> getNamespaceQuotaCache() {
226    return namespaceQuotaCache;
227  }
228
229  Map<String, QuotaState> getRegionServerQuotaCache() {
230    return regionServerQuotaCache;
231  }
232
233  Map<TableName, QuotaState> getTableQuotaCache() {
234    return tableQuotaCache;
235  }
236
237  Map<String, UserQuotaState> getUserQuotaCache() {
238    return userQuotaCache;
239  }
240
241  // TODO: Remove this once we have the notification bus
242  private class QuotaRefresherChore extends ScheduledChore {
243    private long lastUpdate = 0;
244
245    // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
246    // So we cache the results to reduce that load.
247    private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
248    private final RefreshableExpiringValueCache<Integer> regionServersSize;
249
250    public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
251      super("QuotaRefresherChore", stoppable, period);
252
253      Duration tableRegionStatesCacheTtl =
254        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
255      this.tableRegionStatesClusterMetrics =
256        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
257          tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
258            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));
259
260      Duration regionServersSizeCacheTtl =
261        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
262      regionServersSize =
263        new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
264          () -> rsServices.getConnection().getAdmin().getRegionServers().size());
265    }
266
267    @Override
268    public synchronized boolean triggerNow() {
269      tableRegionStatesClusterMetrics.invalidate();
270      regionServersSize.invalidate();
271      return super.triggerNow();
272    }
273
274    @Override
275    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
276        justification = "I do not understand why the complaints, it looks good to me -- FIX")
277    protected void chore() {
278      while (TEST_BLOCK_REFRESH) {
279        LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false");
280        try {
281          Thread.sleep(10);
282        } catch (InterruptedException e) {
283          throw new RuntimeException(e);
284        }
285      }
286      // Prefetch online tables/namespaces
287      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
288        if (table.isSystemTable()) {
289          continue;
290        }
291        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
292
293        final String ns = table.getNamespaceAsString();
294
295        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
296      }
297
298      QuotaCache.this.regionServerQuotaCache
299        .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
300
301      updateQuotaFactors();
302      fetchNamespaceQuotaState();
303      fetchTableQuotaState();
304      fetchUserQuotaState();
305      fetchRegionServerQuotaState();
306      fetchExceedThrottleQuota();
307      lastUpdate = EnvironmentEdgeManager.currentTime();
308    }
309
310    private void fetchNamespaceQuotaState() {
311      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
312        @Override
313        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
314          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
315        }
316
317        @Override
318        public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
319          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
320            machineQuotaFactor);
321        }
322      });
323    }
324
325    private void fetchTableQuotaState() {
326      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
327        @Override
328        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
329          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
330        }
331
332        @Override
333        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
334          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
335            tableMachineQuotaFactors);
336        }
337      });
338    }
339
340    private void fetchUserQuotaState() {
341      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
342      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
343      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
344        @Override
345        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
346          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
347        }
348
349        @Override
350        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
351          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
352            tableMachineQuotaFactors, machineQuotaFactor);
353        }
354      });
355    }
356
357    private void fetchRegionServerQuotaState() {
358      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
359        new Fetcher<String, QuotaState>() {
360          @Override
361          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
362            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
363          }
364
365          @Override
366          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
367            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
368          }
369        });
370    }
371
372    private void fetchExceedThrottleQuota() {
373      try {
374        QuotaCache.this.exceedThrottleQuotaEnabled =
375          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
376      } catch (IOException e) {
377        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
378      }
379    }
380
381    private <K, V extends QuotaState> void fetch(final String type,
382      final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
383      long now = EnvironmentEdgeManager.currentTime();
384      long refreshPeriod = getPeriod();
385      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
386
387      // Find the quota entries to update
388      List<Get> gets = new ArrayList<>();
389      List<K> toRemove = new ArrayList<>();
390      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
391        long lastUpdate = entry.getValue().getLastUpdate();
392        long lastQuery = entry.getValue().getLastQuery();
393        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
394          toRemove.add(entry.getKey());
395        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
396          gets.add(fetcher.makeGet(entry));
397        }
398      }
399
400      for (final K key : toRemove) {
401        if (LOG.isTraceEnabled()) {
402          LOG.trace("evict " + type + " key=" + key);
403        }
404        quotasMap.remove(key);
405      }
406
407      // fetch and update the quota entries
408      if (!gets.isEmpty()) {
409        try {
410          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
411            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
412            if (quotaInfo != null) {
413              quotaInfo.update(entry.getValue());
414            }
415
416            if (LOG.isTraceEnabled()) {
417              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
418            }
419          }
420        } catch (IOException e) {
421          LOG.warn("Unable to read " + type + " from quota table", e);
422        }
423      }
424    }
425
426    /**
427     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
428     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
429     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
430     */
431    private void updateQuotaFactors() {
432      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
433        || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
434      if (hasTableQuotas) {
435        updateTableMachineQuotaFactors();
436      } else {
437        updateOnlyMachineQuotaFactors();
438      }
439    }
440
441    /**
442     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
443     * we don't have any table quotas in the cache.
444     */
445    private void updateOnlyMachineQuotaFactors() {
446      Optional<Integer> rsSize = regionServersSize.get();
447      if (rsSize.isPresent()) {
448        updateMachineQuotaFactors(rsSize.get());
449      } else {
450        regionServersSize.refresh();
451      }
452    }
453
454    /**
455     * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
456     * factors as well. This relies on a more expensive query for ClusterMetrics.
457     */
458    private void updateTableMachineQuotaFactors() {
459      Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
460      if (!clusterMetricsMaybe.isPresent()) {
461        tableRegionStatesClusterMetrics.refresh();
462        return;
463      }
464      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
465      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
466
467      Map<TableName, RegionStatesCount> tableRegionStatesCount =
468        clusterMetrics.getTableRegionStatesCount();
469
470      // Update table machine quota factors
471      for (TableName tableName : tableQuotaCache.keySet()) {
472        if (tableRegionStatesCount.containsKey(tableName)) {
473          double factor = 1;
474          try {
475            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
476            if (regionSize == 0) {
477              factor = 0;
478            } else {
479              int localRegionSize = rsServices.getRegions(tableName).size();
480              factor = 1.0 * localRegionSize / regionSize;
481            }
482          } catch (IOException e) {
483            LOG.warn("Get table regions failed: {}", tableName, e);
484          }
485          tableMachineQuotaFactors.put(tableName, factor);
486        } else {
487          // TableName might have already been dropped (outdated)
488          tableMachineQuotaFactors.remove(tableName);
489        }
490      }
491    }
492
493    private void updateMachineQuotaFactors(int rsSize) {
494      if (rsSize != 0) {
495        // TODO if use rs group, the cluster limit should be shared by the rs group
496        machineQuotaFactor = 1.0 / rsSize;
497      }
498    }
499  }
500
501  static class RefreshableExpiringValueCache<T> {
502    private final String name;
503    private final LoadingCache<String, Optional<T>> cache;
504
505    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
506      ThrowingSupplier<T> supplier) {
507      this.name = name;
508      this.cache =
509        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
510          .build(new CacheLoader<>() {
511            @Override
512            public Optional<T> load(String key) {
513              try {
514                return Optional.of(supplier.get());
515              } catch (Exception e) {
516                LOG.warn("Failed to refresh cache {}", name, e);
517                return Optional.empty();
518              }
519            }
520          });
521    }
522
523    Optional<T> get() {
524      return cache.getUnchecked(name);
525    }
526
527    void refresh() {
528      cache.refresh(name);
529    }
530
531    void invalidate() {
532      cache.invalidate(name);
533    }
534  }
535
536  @FunctionalInterface
537  static interface ThrowingSupplier<T> {
538    T get() throws Exception;
539  }
540
541  static interface Fetcher<Key, Value> {
542    Get makeGet(Map.Entry<Key, Value> entry);
543
544    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
545  }
546}