001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.ipc.RpcScheduler;
025import org.apache.hadoop.hbase.ipc.RpcServer;
026import org.apache.hadoop.hbase.regionserver.Region;
027import org.apache.hadoop.hbase.regionserver.RegionServerServices;
028import org.apache.hadoop.hbase.security.User;
029import org.apache.hadoop.security.UserGroupInformation;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.yetus.audience.InterfaceStability;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
036
037/**
038 * Region Server Quota Manager. It is responsible to provide access to the quota information of each
039 * user/table. The direct user of this class is the RegionServer that will get and check the
040 * user/table quota for each operation (put, get, scan). For system tables and user/table with a
041 * quota specified, the quota check will be a noop.
042 */
043@InterfaceAudience.Private
044@InterfaceStability.Evolving
045public class RegionServerRpcQuotaManager {
046  private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
047
048  private final RegionServerServices rsServices;
049
050  private QuotaCache quotaCache = null;
051  private volatile boolean rpcThrottleEnabled;
052  // Storage for quota rpc throttle
053  private RpcThrottleStorage rpcThrottleStorage;
054
055  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
056    this.rsServices = rsServices;
057    rpcThrottleStorage =
058      new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
059  }
060
061  public void start(final RpcScheduler rpcScheduler) throws IOException {
062    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
063      LOG.info("Quota support disabled");
064      return;
065    }
066
067    LOG.info("Initializing RPC quota support");
068
069    // Initialize quota cache
070    quotaCache = new QuotaCache(rsServices);
071    quotaCache.start();
072    rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
073    LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
074  }
075
076  public void stop() {
077    if (isQuotaEnabled()) {
078      quotaCache.stop("shutdown");
079    }
080  }
081
082  protected boolean isRpcThrottleEnabled() {
083    return rpcThrottleEnabled;
084  }
085
086  private boolean isQuotaEnabled() {
087    return quotaCache != null;
088  }
089
090  public void switchRpcThrottle(boolean enable) throws IOException {
091    if (isQuotaEnabled()) {
092      if (rpcThrottleEnabled != enable) {
093        boolean previousEnabled = rpcThrottleEnabled;
094        rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
095        LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
096      } else {
097        LOG.warn(
098          "Skip switch rpc throttle because previous value {} is the same as current value {}",
099          rpcThrottleEnabled, enable);
100      }
101    } else {
102      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
103    }
104  }
105
106  QuotaCache getQuotaCache() {
107    return quotaCache;
108  }
109
110  /**
111   * Returns the quota for an operation.
112   * @param ugi   the user that is executing the operation
113   * @param table the table where the operation will be executed
114   * @return the OperationQuota
115   */
116  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
117    if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
118      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
119      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
120      boolean useNoop = userLimiter.isBypass();
121      if (userQuotaState.hasBypassGlobals()) {
122        if (LOG.isTraceEnabled()) {
123          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
124        }
125        if (!useNoop) {
126          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
127        }
128      } else {
129        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
130        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
131        QuotaLimiter rsLimiter =
132          quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
133        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
134        boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled();
135        if (LOG.isTraceEnabled()) {
136          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
137            + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
138            + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled);
139        }
140        if (!useNoop) {
141          if (exceedThrottleQuotaEnabled) {
142            return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
143              userLimiter, tableLimiter, nsLimiter);
144          } else {
145            return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
146              tableLimiter, nsLimiter, rsLimiter);
147          }
148        }
149      }
150    }
151    return NoopOperationQuota.get();
152  }
153
154  /**
155   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
156   * available quota and to report the data/usage of the operation.
157   * @param region the region where the operation will be performed
158   * @param type   the operation type
159   * @return the OperationQuota
160   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
161   */
162  public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
163    throws IOException, RpcThrottlingException {
164    switch (type) {
165      case SCAN:
166        return checkQuota(region, 0, 0, 1);
167      case GET:
168        return checkQuota(region, 0, 1, 0);
169      case MUTATE:
170        return checkQuota(region, 1, 0, 0);
171    }
172    throw new RuntimeException("Invalid operation type: " + type);
173  }
174
175  /**
176   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
177   * available quota and to report the data/usage of the operation.
178   * @param region  the region where the operation will be performed
179   * @param actions the "multi" actions to perform
180   * @return the OperationQuota
181   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
182   */
183  public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
184    throws IOException, RpcThrottlingException {
185    int numWrites = 0;
186    int numReads = 0;
187    for (final ClientProtos.Action action : actions) {
188      if (action.hasMutation()) {
189        numWrites++;
190      } else if (action.hasGet()) {
191        numReads++;
192      }
193    }
194    return checkQuota(region, numWrites, numReads, 0);
195  }
196
197  /**
198   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
199   * available quota and to report the data/usage of the operation.
200   * @param region    the region where the operation will be performed
201   * @param numWrites number of writes to perform
202   * @param numReads  number of short-reads to perform
203   * @param numScans  number of scan to perform
204   * @return the OperationQuota
205   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
206   */
207  private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads,
208    final int numScans) throws IOException, RpcThrottlingException {
209    Optional<User> user = RpcServer.getRequestUser();
210    UserGroupInformation ugi;
211    if (user.isPresent()) {
212      ugi = user.get().getUGI();
213    } else {
214      ugi = User.getCurrent().getUGI();
215    }
216    TableName table = region.getTableDescriptor().getTableName();
217
218    OperationQuota quota = getQuota(ugi, table);
219    try {
220      quota.checkQuota(numWrites, numReads, numScans);
221    } catch (RpcThrottlingException e) {
222      LOG.debug(
223        "Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites="
224          + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": " + e.getMessage());
225      throw e;
226    }
227    return quota;
228  }
229}