001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.Optional;
024
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.ipc.RpcScheduler;
027import org.apache.hadoop.hbase.ipc.RpcServer;
028import org.apache.hadoop.hbase.regionserver.Region;
029import org.apache.hadoop.hbase.regionserver.RegionServerServices;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.security.UserGroupInformation;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.yetus.audience.InterfaceStability;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
038
039/**
040 * Region Server Quota Manager.
041 * It is responsible to provide access to the quota information of each user/table.
042 *
043 * The direct user of this class is the RegionServer that will get and check the
044 * user/table quota for each operation (put, get, scan).
045 * For system tables and user/table with a quota specified, the quota check will be a noop.
046 */
047@InterfaceAudience.Private
048@InterfaceStability.Evolving
049public class RegionServerRpcQuotaManager {
050  private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
051
052  private final RegionServerServices rsServices;
053
054  private QuotaCache quotaCache = null;
055  private volatile boolean rpcThrottleEnabled;
056  // Storage for quota rpc throttle
057  private RpcThrottleStorage rpcThrottleStorage;
058
059  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
060    this.rsServices = rsServices;
061    rpcThrottleStorage =
062        new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
063  }
064
065  public void start(final RpcScheduler rpcScheduler) throws IOException {
066    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
067      LOG.info("Quota support disabled");
068      return;
069    }
070
071    LOG.info("Initializing RPC quota support");
072
073    // Initialize quota cache
074    quotaCache = new QuotaCache(rsServices);
075    quotaCache.start();
076    rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
077    LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
078  }
079
080  public void stop() {
081    if (isQuotaEnabled()) {
082      quotaCache.stop("shutdown");
083    }
084  }
085
086  @VisibleForTesting
087  protected boolean isRpcThrottleEnabled() {
088    return rpcThrottleEnabled;
089  }
090
091  private boolean isQuotaEnabled() {
092    return quotaCache != null;
093  }
094
095  public void switchRpcThrottle(boolean enable) throws IOException {
096    if (isQuotaEnabled()) {
097      if (rpcThrottleEnabled != enable) {
098        boolean previousEnabled = rpcThrottleEnabled;
099        rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
100        LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
101      } else {
102        LOG.warn(
103          "Skip switch rpc throttle because previous value {} is the same as current value {}",
104          rpcThrottleEnabled, enable);
105      }
106    } else {
107      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
108    }
109  }
110
111  @VisibleForTesting
112  QuotaCache getQuotaCache() {
113    return quotaCache;
114  }
115
116  /**
117   * Returns the quota for an operation.
118   *
119   * @param ugi the user that is executing the operation
120   * @param table the table where the operation will be executed
121   * @return the OperationQuota
122   */
123  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
124    if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
125      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
126      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
127      boolean useNoop = userLimiter.isBypass();
128      if (userQuotaState.hasBypassGlobals()) {
129        if (LOG.isTraceEnabled()) {
130          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
131        }
132        if (!useNoop) {
133          return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
134        }
135      } else {
136        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
137        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
138        QuotaLimiter rsLimiter = quotaCache
139            .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
140        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
141        boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled();
142        if (LOG.isTraceEnabled()) {
143          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
144              + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
145              + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled);
146        }
147        if (!useNoop) {
148          if (exceedThrottleQuotaEnabled) {
149            return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
150                userLimiter, tableLimiter, nsLimiter);
151          } else {
152            return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
153                tableLimiter, nsLimiter, rsLimiter);
154          }
155        }
156      }
157    }
158    return NoopOperationQuota.get();
159  }
160
161  /**
162   * Check the quota for the current (rpc-context) user.
163   * Returns the OperationQuota used to get the available quota and
164   * to report the data/usage of the operation.
165   * @param region the region where the operation will be performed
166   * @param type the operation type
167   * @return the OperationQuota
168   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
169   */
170  public OperationQuota checkQuota(final Region region,
171      final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
172    switch (type) {
173      case SCAN:   return checkQuota(region, 0, 0, 1);
174      case GET:    return checkQuota(region, 0, 1, 0);
175      case MUTATE: return checkQuota(region, 1, 0, 0);
176    }
177    throw new RuntimeException("Invalid operation type: " + type);
178  }
179
180  /**
181   * Check the quota for the current (rpc-context) user.
182   * Returns the OperationQuota used to get the available quota and
183   * to report the data/usage of the operation.
184   * @param region the region where the operation will be performed
185   * @param actions the "multi" actions to perform
186   * @return the OperationQuota
187   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
188   */
189  public OperationQuota checkQuota(final Region region,
190      final List<ClientProtos.Action> actions) throws IOException, RpcThrottlingException {
191    int numWrites = 0;
192    int numReads = 0;
193    for (final ClientProtos.Action action: actions) {
194      if (action.hasMutation()) {
195        numWrites++;
196      } else if (action.hasGet()) {
197        numReads++;
198      }
199    }
200    return checkQuota(region, numWrites, numReads, 0);
201  }
202
203  /**
204   * Check the quota for the current (rpc-context) user.
205   * Returns the OperationQuota used to get the available quota and
206   * to report the data/usage of the operation.
207   * @param region the region where the operation will be performed
208   * @param numWrites number of writes to perform
209   * @param numReads number of short-reads to perform
210   * @param numScans number of scan to perform
211   * @return the OperationQuota
212   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
213   */
214  private OperationQuota checkQuota(final Region region,
215      final int numWrites, final int numReads, final int numScans)
216      throws IOException, RpcThrottlingException {
217    Optional<User> user = RpcServer.getRequestUser();
218    UserGroupInformation ugi;
219    if (user.isPresent()) {
220      ugi = user.get().getUGI();
221    } else {
222      ugi = User.getCurrent().getUGI();
223    }
224    TableName table = region.getTableDescriptor().getTableName();
225
226    OperationQuota quota = getQuota(ugi, table);
227    try {
228      quota.checkQuota(numWrites, numReads, numScans);
229    } catch (RpcThrottlingException e) {
230      LOG.debug("Throttling exception for user=" + ugi.getUserName() +
231                " table=" + table + " numWrites=" + numWrites +
232                " numReads=" + numReads + " numScans=" + numScans +
233                ": " + e.getMessage());
234      throw e;
235    }
236    return quota;
237  }
238}