001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.Optional;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.yetus.audience.InterfaceStability;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.ipc.RpcScheduler;
031import org.apache.hadoop.hbase.ipc.RpcServer;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
033import org.apache.hadoop.hbase.regionserver.Region;
034import org.apache.hadoop.hbase.regionserver.RegionServerServices;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.security.UserGroupInformation;
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039/**
040 * Region Server Quota Manager.
041 * It is responsible to provide access to the quota information of each user/table.
042 *
043 * The direct user of this class is the RegionServer that will get and check the
044 * user/table quota for each operation (put, get, scan).
045 * For system tables and user/table with a quota specified, the quota check will be a noop.
046 */
047@InterfaceAudience.Private
048@InterfaceStability.Evolving
049public class RegionServerRpcQuotaManager {
050  private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
051
052  private final RegionServerServices rsServices;
053
054  private QuotaCache quotaCache = null;
055
056  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
057    this.rsServices = rsServices;
058  }
059
060  public void start(final RpcScheduler rpcScheduler) throws IOException {
061    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
062      LOG.info("Quota support disabled");
063      return;
064    }
065
066    LOG.info("Initializing RPC quota support");
067
068    // Initialize quota cache
069    quotaCache = new QuotaCache(rsServices);
070    quotaCache.start();
071  }
072
073  public void stop() {
074    if (isQuotaEnabled()) {
075      quotaCache.stop("shutdown");
076    }
077  }
078
079  public boolean isQuotaEnabled() {
080    return quotaCache != null;
081  }
082
083  @VisibleForTesting
084  QuotaCache getQuotaCache() {
085    return quotaCache;
086  }
087
088  /**
089   * Returns the quota for an operation.
090   *
091   * @param ugi the user that is executing the operation
092   * @param table the table where the operation will be executed
093   * @return the OperationQuota
094   */
095  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
096    if (isQuotaEnabled() && !table.isSystemTable()) {
097      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
098      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
099      boolean useNoop = userLimiter.isBypass();
100      if (userQuotaState.hasBypassGlobals()) {
101        if (LOG.isTraceEnabled()) {
102          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
103        }
104        if (!useNoop) {
105          return new DefaultOperationQuota(userLimiter);
106        }
107      } else {
108        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
109        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
110        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
111        if (LOG.isTraceEnabled()) {
112          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
113                    userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
114        }
115        if (!useNoop) {
116          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
117        }
118      }
119    }
120    return NoopOperationQuota.get();
121  }
122
123  /**
124   * Check the quota for the current (rpc-context) user.
125   * Returns the OperationQuota used to get the available quota and
126   * to report the data/usage of the operation.
127   * @param region the region where the operation will be performed
128   * @param type the operation type
129   * @return the OperationQuota
130   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
131   */
132  public OperationQuota checkQuota(final Region region,
133      final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
134    switch (type) {
135      case SCAN:   return checkQuota(region, 0, 0, 1);
136      case GET:    return checkQuota(region, 0, 1, 0);
137      case MUTATE: return checkQuota(region, 1, 0, 0);
138    }
139    throw new RuntimeException("Invalid operation type: " + type);
140  }
141
142  /**
143   * Check the quota for the current (rpc-context) user.
144   * Returns the OperationQuota used to get the available quota and
145   * to report the data/usage of the operation.
146   * @param region the region where the operation will be performed
147   * @param actions the "multi" actions to perform
148   * @return the OperationQuota
149   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
150   */
151  public OperationQuota checkQuota(final Region region,
152      final List<ClientProtos.Action> actions) throws IOException, RpcThrottlingException {
153    int numWrites = 0;
154    int numReads = 0;
155    for (final ClientProtos.Action action: actions) {
156      if (action.hasMutation()) {
157        numWrites++;
158      } else if (action.hasGet()) {
159        numReads++;
160      }
161    }
162    return checkQuota(region, numWrites, numReads, 0);
163  }
164
165  /**
166   * Check the quota for the current (rpc-context) user.
167   * Returns the OperationQuota used to get the available quota and
168   * to report the data/usage of the operation.
169   * @param region the region where the operation will be performed
170   * @param numWrites number of writes to perform
171   * @param numReads number of short-reads to perform
172   * @param numScans number of scan to perform
173   * @return the OperationQuota
174   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
175   */
176  private OperationQuota checkQuota(final Region region,
177      final int numWrites, final int numReads, final int numScans)
178      throws IOException, RpcThrottlingException {
179    Optional<User> user = RpcServer.getRequestUser();
180    UserGroupInformation ugi;
181    if (user.isPresent()) {
182      ugi = user.get().getUGI();
183    } else {
184      ugi = User.getCurrent().getUGI();
185    }
186    TableName table = region.getTableDescriptor().getTableName();
187
188    OperationQuota quota = getQuota(ugi, table);
189    try {
190      quota.checkQuota(numWrites, numReads, numScans);
191    } catch (RpcThrottlingException e) {
192      LOG.debug("Throttling exception for user=" + ugi.getUserName() +
193                " table=" + table + " numWrites=" + numWrites +
194                " numReads=" + numReads + " numScans=" + numScans +
195                ": " + e.getMessage());
196      throw e;
197    }
198    return quota;
199  }
200}