001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.ClusterMetrics.Option;
034import org.apache.hadoop.hbase.ScheduledChore;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.RegionStatesCount;
039import org.apache.hadoop.hbase.ipc.RpcCall;
040import org.apache.hadoop.hbase.ipc.RpcServer;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.regionserver.RegionServerServices;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.yetus.audience.InterfaceStability;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Cache that keeps track of the quota settings for the users and tables that are interacting with
053 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
054 * be returned and the request to fetch the quota information will be enqueued for the next refresh.
055 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
056 * events. Later the Quotas will be pushed using the notification system.
057 */
058@InterfaceAudience.Private
059@InterfaceStability.Evolving
060public class QuotaCache implements Stoppable {
061  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
062
063  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
064
065  // defines the request attribute key which, when provided, will override the request's username
066  // from the perspective of user quotas
067  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
068    "hbase.quota.user.override.key";
069  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
070  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
071
072  // for testing purpose only, enforce the cache to be always refreshed
073  static boolean TEST_FORCE_REFRESH = false;
074  // for testing purpose only, block cache refreshes to reliably verify state
075  static boolean TEST_BLOCK_REFRESH = false;
076
077  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
078  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
079  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
080  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
081    new ConcurrentHashMap<>();
082  private volatile boolean exceedThrottleQuotaEnabled = false;
083  // factors used to divide cluster scope quota into machine scope quota
084  private volatile double machineQuotaFactor = 1;
085  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
086    new ConcurrentHashMap<>();
087  private final RegionServerServices rsServices;
088  private final String userOverrideRequestAttributeKey;
089
090  private QuotaRefresherChore refreshChore;
091  private boolean stopped = true;
092
093  public QuotaCache(final RegionServerServices rsServices) {
094    this.rsServices = rsServices;
095    this.userOverrideRequestAttributeKey =
096      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
097  }
098
099  public void start() throws IOException {
100    stopped = false;
101
102    // TODO: This will be replaced once we have the notification bus ready.
103    Configuration conf = rsServices.getConfiguration();
104    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
105    refreshChore = new QuotaRefresherChore(period, this);
106    rsServices.getChoreService().scheduleChore(refreshChore);
107  }
108
109  @Override
110  public void stop(final String why) {
111    if (refreshChore != null) {
112      LOG.debug("Stopping QuotaRefresherChore chore.");
113      refreshChore.shutdown(true);
114    }
115    stopped = true;
116  }
117
118  @Override
119  public boolean isStopped() {
120    return stopped;
121  }
122
123  /**
124   * Returns the limiter associated to the specified user/table.
125   * @param ugi   the user to limit
126   * @param table the table to limit
127   * @return the limiter associated to the specified user/table
128   */
129  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
130    if (table.isSystemTable()) {
131      return NoopQuotaLimiter.get();
132    }
133    return getUserQuotaState(ugi).getTableLimiter(table);
134  }
135
136  /**
137   * Returns the QuotaState associated to the specified user.
138   * @param ugi the user
139   * @return the quota info associated to specified user
140   */
141  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
142    return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
143      () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L),
144      this::triggerCacheRefresh);
145  }
146
147  /**
148   * Returns the limiter associated to the specified table.
149   * @param table the table to limit
150   * @return the limiter associated to the specified table
151   */
152  public QuotaLimiter getTableLimiter(final TableName table) {
153    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
154  }
155
156  /**
157   * Returns the limiter associated to the specified namespace.
158   * @param namespace the namespace to limit
159   * @return the limiter associated to the specified namespace
160   */
161  public QuotaLimiter getNamespaceLimiter(final String namespace) {
162    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
163  }
164
165  /**
166   * Returns the limiter associated to the specified region server.
167   * @param regionServer the region server to limit
168   * @return the limiter associated to the specified region server
169   */
170  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
171    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
172  }
173
174  protected boolean isExceedThrottleQuotaEnabled() {
175    return exceedThrottleQuotaEnabled;
176  }
177
178  /**
179   * Applies a request attribute user override if available, otherwise returns the UGI's short
180   * username
181   * @param ugi The request's UserGroupInformation
182   */
183  private String getQuotaUserName(final UserGroupInformation ugi) {
184    if (userOverrideRequestAttributeKey == null) {
185      return ugi.getShortUserName();
186    }
187
188    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
189    if (!rpcCall.isPresent()) {
190      return ugi.getShortUserName();
191    }
192
193    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
194    if (override == null) {
195      return ugi.getShortUserName();
196    }
197    return Bytes.toString(override);
198  }
199
200  /**
201   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
202   * returned and the quota request will be enqueued for the next cache refresh.
203   */
204  private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
205    return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
206  }
207
208  void triggerCacheRefresh() {
209    refreshChore.triggerNow();
210  }
211
212  long getLastUpdate() {
213    return refreshChore.lastUpdate;
214  }
215
216  Map<String, QuotaState> getNamespaceQuotaCache() {
217    return namespaceQuotaCache;
218  }
219
220  Map<String, QuotaState> getRegionServerQuotaCache() {
221    return regionServerQuotaCache;
222  }
223
224  Map<TableName, QuotaState> getTableQuotaCache() {
225    return tableQuotaCache;
226  }
227
228  Map<String, UserQuotaState> getUserQuotaCache() {
229    return userQuotaCache;
230  }
231
232  // TODO: Remove this once we have the notification bus
233  private class QuotaRefresherChore extends ScheduledChore {
234    private long lastUpdate = 0;
235
236    public QuotaRefresherChore(final int period, final Stoppable stoppable) {
237      super("QuotaRefresherChore", stoppable, period);
238    }
239
240    @Override
241    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
242        justification = "I do not understand why the complaints, it looks good to me -- FIX")
243    protected void chore() {
244      while (TEST_BLOCK_REFRESH) {
245        LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false");
246        try {
247          Thread.sleep(10);
248        } catch (InterruptedException e) {
249          throw new RuntimeException(e);
250        }
251      }
252      // Prefetch online tables/namespaces
253      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
254        if (table.isSystemTable()) {
255          continue;
256        }
257        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
258
259        final String ns = table.getNamespaceAsString();
260
261        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
262      }
263
264      QuotaCache.this.regionServerQuotaCache
265        .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
266
267      updateQuotaFactors();
268      fetchNamespaceQuotaState();
269      fetchTableQuotaState();
270      fetchUserQuotaState();
271      fetchRegionServerQuotaState();
272      fetchExceedThrottleQuota();
273      lastUpdate = EnvironmentEdgeManager.currentTime();
274    }
275
276    private void fetchNamespaceQuotaState() {
277      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
278        @Override
279        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
280          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
281        }
282
283        @Override
284        public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
285          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
286            machineQuotaFactor);
287        }
288      });
289    }
290
291    private void fetchTableQuotaState() {
292      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
293        @Override
294        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
295          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
296        }
297
298        @Override
299        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
300          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
301            tableMachineQuotaFactors);
302        }
303      });
304    }
305
306    private void fetchUserQuotaState() {
307      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
308      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
309      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
310        @Override
311        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
312          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
313        }
314
315        @Override
316        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
317          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
318            tableMachineQuotaFactors, machineQuotaFactor);
319        }
320      });
321    }
322
323    private void fetchRegionServerQuotaState() {
324      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
325        new Fetcher<String, QuotaState>() {
326          @Override
327          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
328            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
329          }
330
331          @Override
332          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
333            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
334          }
335        });
336    }
337
338    private void fetchExceedThrottleQuota() {
339      try {
340        QuotaCache.this.exceedThrottleQuotaEnabled =
341          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
342      } catch (IOException e) {
343        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
344      }
345    }
346
347    private <K, V extends QuotaState> void fetch(final String type,
348      final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
349      long now = EnvironmentEdgeManager.currentTime();
350      long refreshPeriod = getPeriod();
351      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
352
353      // Find the quota entries to update
354      List<Get> gets = new ArrayList<>();
355      List<K> toRemove = new ArrayList<>();
356      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
357        long lastUpdate = entry.getValue().getLastUpdate();
358        long lastQuery = entry.getValue().getLastQuery();
359        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
360          toRemove.add(entry.getKey());
361        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
362          gets.add(fetcher.makeGet(entry));
363        }
364      }
365
366      for (final K key : toRemove) {
367        if (LOG.isTraceEnabled()) {
368          LOG.trace("evict " + type + " key=" + key);
369        }
370        quotasMap.remove(key);
371      }
372
373      // fetch and update the quota entries
374      if (!gets.isEmpty()) {
375        try {
376          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
377            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
378            if (quotaInfo != null) {
379              quotaInfo.update(entry.getValue());
380            }
381
382            if (LOG.isTraceEnabled()) {
383              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
384            }
385          }
386        } catch (IOException e) {
387          LOG.warn("Unable to read " + type + " from quota table", e);
388        }
389      }
390    }
391
392    /**
393     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
394     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
395     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
396     */
397    private void updateQuotaFactors() {
398      // Update machine quota factor
399      ClusterMetrics clusterMetrics;
400      try {
401        clusterMetrics = rsServices.getConnection().getAdmin()
402          .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT));
403      } catch (IOException e) {
404        LOG.warn("Failed to get cluster metrics needed for updating quotas", e);
405        return;
406      }
407
408      int rsSize = clusterMetrics.getServersName().size();
409      if (rsSize != 0) {
410        // TODO if use rs group, the cluster limit should be shared by the rs group
411        machineQuotaFactor = 1.0 / rsSize;
412      }
413
414      Map<TableName, RegionStatesCount> tableRegionStatesCount =
415        clusterMetrics.getTableRegionStatesCount();
416
417      // Update table machine quota factors
418      for (TableName tableName : tableQuotaCache.keySet()) {
419        if (tableRegionStatesCount.containsKey(tableName)) {
420          double factor = 1;
421          try {
422            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
423            if (regionSize == 0) {
424              factor = 0;
425            } else {
426              int localRegionSize = rsServices.getRegions(tableName).size();
427              factor = 1.0 * localRegionSize / regionSize;
428            }
429          } catch (IOException e) {
430            LOG.warn("Get table regions failed: {}", tableName, e);
431          }
432          tableMachineQuotaFactors.put(tableName, factor);
433        } else {
434          // TableName might have already been dropped (outdated)
435          tableMachineQuotaFactors.remove(tableName);
436        }
437      }
438    }
439  }
440
441  static interface Fetcher<Key, Value> {
442    Get makeGet(Map.Entry<Key, Value> entry);
443
444    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
445  }
446}