001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.ClusterMetrics.Option;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.RegionStatesCount;
041import org.apache.hadoop.hbase.ipc.RpcCall;
042import org.apache.hadoop.hbase.ipc.RpcServer;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.regionserver.RegionServerServices;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
056
057/**
058 * Cache that keeps track of the quota settings for the users and tables that are interacting with
059 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
060 * be returned and the request to fetch the quota information will be enqueued for the next refresh.
061 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
062 * events. Later the Quotas will be pushed using the notification system.
063 */
064@InterfaceAudience.Private
065@InterfaceStability.Evolving
066public class QuotaCache implements Stoppable {
067  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
068
069  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
070  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
071    "hbase.quota.cache.ttl.region.states.ms";
072  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
073    "hbase.quota.cache.ttl.servers.size.ms";
074
075  // defines the request attribute key which, when provided, will override the request's username
076  // from the perspective of user quotas
077  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
078    "hbase.quota.user.override.key";
079  private static final int REFRESH_DEFAULT_PERIOD = 43_200_000; // 12 hours
080  private static final int EVICT_PERIOD_FACTOR = 5;
081
082  // for testing purpose only, enforce the cache to be always refreshed
083  static boolean TEST_FORCE_REFRESH = false;
084  // for testing purpose only, block cache refreshes to reliably verify state
085  static boolean TEST_BLOCK_REFRESH = false;
086
087  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
088  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
089  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
090  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
091    new ConcurrentHashMap<>();
092  private volatile boolean exceedThrottleQuotaEnabled = false;
093  // factors used to divide cluster scope quota into machine scope quota
094  private volatile double machineQuotaFactor = 1;
095  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
096    new ConcurrentHashMap<>();
097  private final RegionServerServices rsServices;
098  private final String userOverrideRequestAttributeKey;
099
100  private QuotaRefresherChore refreshChore;
101  private boolean stopped = true;
102
103  public QuotaCache(final RegionServerServices rsServices) {
104    this.rsServices = rsServices;
105    this.userOverrideRequestAttributeKey =
106      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
107  }
108
109  public void start() throws IOException {
110    stopped = false;
111
112    Configuration conf = rsServices.getConfiguration();
113    // Refresh the cache every 12 hours, and every time a quota is changed, and every time a
114    // configuration
115    // reload is triggered. Periodic reloads are kept to a minimum to avoid flooding the
116    // RegionServer
117    // holding the hbase:quota table with requests.
118    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
119    refreshChore = new QuotaRefresherChore(conf, period, this);
120    rsServices.getChoreService().scheduleChore(refreshChore);
121  }
122
123  @Override
124  public void stop(final String why) {
125    if (refreshChore != null) {
126      LOG.debug("Stopping QuotaRefresherChore chore.");
127      refreshChore.shutdown(true);
128    }
129    stopped = true;
130  }
131
132  @Override
133  public boolean isStopped() {
134    return stopped;
135  }
136
137  /**
138   * Returns the limiter associated to the specified user/table.
139   * @param ugi   the user to limit
140   * @param table the table to limit
141   * @return the limiter associated to the specified user/table
142   */
143  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
144    if (table.isSystemTable()) {
145      return NoopQuotaLimiter.get();
146    }
147    return getUserQuotaState(ugi).getTableLimiter(table);
148  }
149
150  /**
151   * Returns the QuotaState associated to the specified user.
152   * @param ugi the user
153   * @return the quota info associated to specified user
154   */
155  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
156    return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
157      () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
158  }
159
160  /**
161   * Returns the limiter associated to the specified table.
162   * @param table the table to limit
163   * @return the limiter associated to the specified table
164   */
165  public QuotaLimiter getTableLimiter(final TableName table) {
166    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
167  }
168
169  /**
170   * Returns the limiter associated to the specified namespace.
171   * @param namespace the namespace to limit
172   * @return the limiter associated to the specified namespace
173   */
174  public QuotaLimiter getNamespaceLimiter(final String namespace) {
175    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
176  }
177
178  /**
179   * Returns the limiter associated to the specified region server.
180   * @param regionServer the region server to limit
181   * @return the limiter associated to the specified region server
182   */
183  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
184    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
185  }
186
187  protected boolean isExceedThrottleQuotaEnabled() {
188    return exceedThrottleQuotaEnabled;
189  }
190
191  /**
192   * Applies a request attribute user override if available, otherwise returns the UGI's short
193   * username
194   * @param ugi The request's UserGroupInformation
195   */
196  private String getQuotaUserName(final UserGroupInformation ugi) {
197    if (userOverrideRequestAttributeKey == null) {
198      return ugi.getShortUserName();
199    }
200
201    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
202    if (!rpcCall.isPresent()) {
203      return ugi.getShortUserName();
204    }
205
206    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
207    if (override == null) {
208      return ugi.getShortUserName();
209    }
210    return Bytes.toString(override);
211  }
212
213  /**
214   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
215   * returned and the quota request will be enqueued for the next cache refresh.
216   */
217  private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
218    return computeIfAbsent(quotasMap, key, QuotaState::new);
219  }
220
221  void triggerCacheRefresh() {
222    refreshChore.triggerNow();
223  }
224
225  void forceSynchronousCacheRefresh() {
226    refreshChore.chore();
227  }
228
229  long getLastUpdate() {
230    return refreshChore.lastUpdate;
231  }
232
233  Map<String, QuotaState> getNamespaceQuotaCache() {
234    return namespaceQuotaCache;
235  }
236
237  Map<String, QuotaState> getRegionServerQuotaCache() {
238    return regionServerQuotaCache;
239  }
240
241  Map<TableName, QuotaState> getTableQuotaCache() {
242    return tableQuotaCache;
243  }
244
245  Map<String, UserQuotaState> getUserQuotaCache() {
246    return userQuotaCache;
247  }
248
249  // TODO: Remove this once we have the notification bus
250  private class QuotaRefresherChore extends ScheduledChore {
251    private long lastUpdate = 0;
252
253    // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
254    // So we cache the results to reduce that load.
255    private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
256    private final RefreshableExpiringValueCache<Integer> regionServersSize;
257
258    public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
259      super("QuotaRefresherChore", stoppable, period);
260
261      Duration tableRegionStatesCacheTtl =
262        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
263      this.tableRegionStatesClusterMetrics =
264        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
265          tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
266            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));
267
268      Duration regionServersSizeCacheTtl =
269        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
270      regionServersSize =
271        new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
272          () -> rsServices.getConnection().getAdmin().getRegionServers().size());
273    }
274
275    @Override
276    public synchronized boolean triggerNow() {
277      tableRegionStatesClusterMetrics.invalidate();
278      regionServersSize.invalidate();
279      return super.triggerNow();
280    }
281
282    @Override
283    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
284        justification = "I do not understand why the complaints, it looks good to me -- FIX")
285    protected void chore() {
286      while (TEST_BLOCK_REFRESH) {
287        LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false");
288        try {
289          Thread.sleep(10);
290        } catch (InterruptedException e) {
291          throw new RuntimeException(e);
292        }
293      }
294      // Prefetch online tables/namespaces
295      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
296        if (table.isSystemTable()) {
297          continue;
298        }
299        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
300
301        final String ns = table.getNamespaceAsString();
302
303        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
304      }
305
306      QuotaCache.this.regionServerQuotaCache
307        .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
308
309      updateQuotaFactors();
310      fetchNamespaceQuotaState();
311      fetchTableQuotaState();
312      fetchUserQuotaState();
313      fetchRegionServerQuotaState();
314      fetchExceedThrottleQuota();
315      lastUpdate = EnvironmentEdgeManager.currentTime();
316    }
317
318    private void fetchNamespaceQuotaState() {
319      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
320        @Override
321        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
322          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
323        }
324
325        @Override
326        public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
327          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
328            machineQuotaFactor);
329        }
330      });
331    }
332
333    private void fetchTableQuotaState() {
334      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
335        @Override
336        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
337          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
338        }
339
340        @Override
341        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
342          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
343            tableMachineQuotaFactors);
344        }
345      });
346    }
347
348    private void fetchUserQuotaState() {
349      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
350      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
351      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
352        @Override
353        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
354          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
355        }
356
357        @Override
358        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
359          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
360            tableMachineQuotaFactors, machineQuotaFactor);
361        }
362      });
363    }
364
365    private void fetchRegionServerQuotaState() {
366      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
367        new Fetcher<String, QuotaState>() {
368          @Override
369          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
370            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
371          }
372
373          @Override
374          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
375            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
376          }
377        });
378    }
379
380    private void fetchExceedThrottleQuota() {
381      try {
382        QuotaCache.this.exceedThrottleQuotaEnabled =
383          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
384      } catch (IOException e) {
385        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
386      }
387    }
388
389    private <K, V extends QuotaState> void fetch(final String type,
390      final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
391      long now = EnvironmentEdgeManager.currentTime();
392      long evictPeriod = getPeriod() * EVICT_PERIOD_FACTOR;
393      // Find the quota entries to update
394      List<Get> gets = new ArrayList<>();
395      List<K> toRemove = new ArrayList<>();
396      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
397        long lastQuery = entry.getValue().getLastQuery();
398        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
399          toRemove.add(entry.getKey());
400        } else {
401          gets.add(fetcher.makeGet(entry));
402        }
403      }
404
405      for (final K key : toRemove) {
406        if (LOG.isTraceEnabled()) {
407          LOG.trace("evict " + type + " key=" + key);
408        }
409        quotasMap.remove(key);
410      }
411
412      // fetch and update the quota entries
413      if (!gets.isEmpty()) {
414        try {
415          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
416            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
417            if (quotaInfo != null) {
418              quotaInfo.update(entry.getValue());
419            }
420
421            if (LOG.isTraceEnabled()) {
422              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
423            }
424          }
425        } catch (IOException e) {
426          LOG.warn("Unable to read " + type + " from quota table", e);
427        }
428      }
429    }
430
431    /**
432     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
433     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
434     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
435     */
436    private void updateQuotaFactors() {
437      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
438        || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
439      if (hasTableQuotas) {
440        updateTableMachineQuotaFactors();
441      } else {
442        updateOnlyMachineQuotaFactors();
443      }
444    }
445
446    /**
447     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
448     * we don't have any table quotas in the cache.
449     */
450    private void updateOnlyMachineQuotaFactors() {
451      Optional<Integer> rsSize = regionServersSize.get();
452      if (rsSize.isPresent()) {
453        updateMachineQuotaFactors(rsSize.get());
454      } else {
455        regionServersSize.refresh();
456      }
457    }
458
459    /**
460     * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
461     * factors as well. This relies on a more expensive query for ClusterMetrics.
462     */
463    private void updateTableMachineQuotaFactors() {
464      Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
465      if (!clusterMetricsMaybe.isPresent()) {
466        tableRegionStatesClusterMetrics.refresh();
467        return;
468      }
469      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
470      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
471
472      Map<TableName, RegionStatesCount> tableRegionStatesCount =
473        clusterMetrics.getTableRegionStatesCount();
474
475      // Update table machine quota factors
476      for (TableName tableName : tableQuotaCache.keySet()) {
477        if (tableRegionStatesCount.containsKey(tableName)) {
478          double factor = 1;
479          try {
480            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
481            if (regionSize == 0) {
482              factor = 0;
483            } else {
484              int localRegionSize = rsServices.getRegions(tableName).size();
485              factor = 1.0 * localRegionSize / regionSize;
486            }
487          } catch (IOException e) {
488            LOG.warn("Get table regions failed: {}", tableName, e);
489          }
490          tableMachineQuotaFactors.put(tableName, factor);
491        } else {
492          // TableName might have already been dropped (outdated)
493          tableMachineQuotaFactors.remove(tableName);
494        }
495      }
496    }
497
498    private void updateMachineQuotaFactors(int rsSize) {
499      if (rsSize != 0) {
500        // TODO if use rs group, the cluster limit should be shared by the rs group
501        machineQuotaFactor = 1.0 / rsSize;
502      }
503    }
504  }
505
506  static class RefreshableExpiringValueCache<T> {
507    private final String name;
508    private final LoadingCache<String, Optional<T>> cache;
509
510    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
511      ThrowingSupplier<T> supplier) {
512      this.name = name;
513      this.cache =
514        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
515          .build(new CacheLoader<>() {
516            @Override
517            public Optional<T> load(String key) {
518              try {
519                return Optional.of(supplier.get());
520              } catch (Exception e) {
521                LOG.warn("Failed to refresh cache {}", name, e);
522                return Optional.empty();
523              }
524            }
525          });
526    }
527
528    Optional<T> get() {
529      return cache.getUnchecked(name);
530    }
531
532    void refresh() {
533      cache.refresh(name);
534    }
535
536    void invalidate() {
537      cache.invalidate(name);
538    }
539  }
540
541  @FunctionalInterface
542  static interface ThrowingSupplier<T> {
543    T get() throws Exception;
544  }
545
546  static interface Fetcher<Key, Value> {
547    Get makeGet(Map.Entry<Key, Value> entry);
548
549    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
550  }
551}