001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.client.TableDescriptor;
025import org.apache.hadoop.hbase.ipc.RpcScheduler;
026import org.apache.hadoop.hbase.ipc.RpcServer;
027import org.apache.hadoop.hbase.regionserver.Region;
028import org.apache.hadoop.hbase.regionserver.RegionServerServices;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.security.UserGroupInformation;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.yetus.audience.InterfaceStability;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
037
038/**
039 * Region Server Quota Manager. It is responsible to provide access to the quota information of each
040 * user/table. The direct user of this class is the RegionServer that will get and check the
041 * user/table quota for each operation (put, get, scan). For system tables and user/table with a
042 * quota specified, the quota check will be a noop.
043 */
044@InterfaceAudience.Private
045@InterfaceStability.Evolving
046public class RegionServerRpcQuotaManager {
047  private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
048
049  private final RegionServerServices rsServices;
050
051  private QuotaCache quotaCache = null;
052  private volatile boolean rpcThrottleEnabled;
053  // Storage for quota rpc throttle
054  private RpcThrottleStorage rpcThrottleStorage;
055
056  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
057    this.rsServices = rsServices;
058    rpcThrottleStorage =
059      new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
060  }
061
062  public void start(final RpcScheduler rpcScheduler) throws IOException {
063    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
064      LOG.info("Quota support disabled");
065      return;
066    }
067
068    LOG.info("Initializing RPC quota support");
069
070    // Initialize quota cache
071    quotaCache = new QuotaCache(rsServices);
072    quotaCache.start();
073    rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
074    LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
075  }
076
077  public void stop() {
078    if (isQuotaEnabled()) {
079      quotaCache.stop("shutdown");
080    }
081  }
082
083  protected boolean isRpcThrottleEnabled() {
084    return rpcThrottleEnabled;
085  }
086
087  private boolean isQuotaEnabled() {
088    return quotaCache != null;
089  }
090
091  public void switchRpcThrottle(boolean enable) throws IOException {
092    if (isQuotaEnabled()) {
093      if (rpcThrottleEnabled != enable) {
094        boolean previousEnabled = rpcThrottleEnabled;
095        rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
096        LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
097      } else {
098        LOG.warn(
099          "Skip switch rpc throttle because previous value {} is the same as current value {}",
100          rpcThrottleEnabled, enable);
101      }
102    } else {
103      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
104    }
105  }
106
107  QuotaCache getQuotaCache() {
108    return quotaCache;
109  }
110
111  /**
112   * Returns the quota for an operation.
113   * @param ugi   the user that is executing the operation
114   * @param table the table where the operation will be executed
115   * @return the OperationQuota
116   */
117  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table,
118    final int blockSizeBytes) {
119    if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
120      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
121      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
122      boolean useNoop = userLimiter.isBypass();
123      if (userQuotaState.hasBypassGlobals()) {
124        if (LOG.isTraceEnabled()) {
125          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
126        }
127        if (!useNoop) {
128          return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
129            userLimiter);
130        }
131      } else {
132        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
133        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
134        QuotaLimiter rsLimiter =
135          quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
136        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
137        boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled();
138        if (LOG.isTraceEnabled()) {
139          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
140            + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
141            + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled);
142        }
143        if (!useNoop) {
144          if (exceedThrottleQuotaEnabled) {
145            return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
146              rsLimiter, userLimiter, tableLimiter, nsLimiter);
147          } else {
148            return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
149              userLimiter, tableLimiter, nsLimiter, rsLimiter);
150          }
151        }
152      }
153    }
154    return NoopOperationQuota.get();
155  }
156
157  /**
158   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
159   * available quota and to report the data/usage of the operation. This method is specific to scans
160   * because estimating a scan's workload is more complicated than estimating the workload of a
161   * get/put.
162   * @param region                          the region where the operation will be performed
163   * @param scanRequest                     the scan to be estimated against the quota
164   * @param maxScannerResultSize            the maximum bytes to be returned by the scanner
165   * @param maxBlockBytesScanned            the maximum bytes scanned in a single RPC call by the
166   *                                        scanner
167   * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
168   *                                        calls
169   * @return the OperationQuota
170   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
171   */
172  public OperationQuota checkScanQuota(final Region region,
173    final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
174    long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
175    throws IOException, RpcThrottlingException {
176    Optional<User> user = RpcServer.getRequestUser();
177    UserGroupInformation ugi;
178    if (user.isPresent()) {
179      ugi = user.get().getUGI();
180    } else {
181      ugi = User.getCurrent().getUGI();
182    }
183    TableDescriptor tableDescriptor = region.getTableDescriptor();
184    TableName table = tableDescriptor.getTableName();
185
186    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
187    try {
188      quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
189        prevBlockBytesScannedDifference);
190    } catch (RpcThrottlingException e) {
191      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan="
192        + scanRequest.getScannerId() + ": " + e.getMessage());
193      throw e;
194    }
195    return quota;
196  }
197
198  /**
199   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
200   * available quota and to report the data/usage of the operation. This method does not support
201   * scans because estimating a scan's workload is more complicated than estimating the workload of
202   * a get/put.
203   * @param region the region where the operation will be performed
204   * @param type   the operation type
205   * @return the OperationQuota
206   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
207   */
208  public OperationQuota checkBatchQuota(final Region region,
209    final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
210    switch (type) {
211      case GET:
212        return this.checkBatchQuota(region, 0, 1);
213      case MUTATE:
214        return this.checkBatchQuota(region, 1, 0);
215      case CHECK_AND_MUTATE:
216        return this.checkBatchQuota(region, 1, 1);
217    }
218    throw new RuntimeException("Invalid operation type: " + type);
219  }
220
221  /**
222   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
223   * available quota and to report the data/usage of the operation. This method does not support
224   * scans because estimating a scan's workload is more complicated than estimating the workload of
225   * a get/put.
226   * @param region       the region where the operation will be performed
227   * @param actions      the "multi" actions to perform
228   * @param hasCondition whether the RegionAction has a condition
229   * @return the OperationQuota
230   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
231   */
232  public OperationQuota checkBatchQuota(final Region region,
233    final List<ClientProtos.Action> actions, boolean hasCondition)
234    throws IOException, RpcThrottlingException {
235    int numWrites = 0;
236    int numReads = 0;
237    for (final ClientProtos.Action action : actions) {
238      if (action.hasMutation()) {
239        numWrites++;
240        OperationQuota.OperationType operationType =
241          QuotaUtil.getQuotaOperationType(action, hasCondition);
242        if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
243          numReads++;
244        }
245      } else if (action.hasGet()) {
246        numReads++;
247      }
248    }
249    return checkBatchQuota(region, numWrites, numReads);
250  }
251
252  /**
253   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
254   * available quota and to report the data/usage of the operation.
255   * @param region    the region where the operation will be performed
256   * @param numWrites number of writes to perform
257   * @param numReads  number of short-reads to perform
258   * @return the OperationQuota
259   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
260   */
261  private OperationQuota checkBatchQuota(final Region region, final int numWrites,
262    final int numReads) throws IOException, RpcThrottlingException {
263    Optional<User> user = RpcServer.getRequestUser();
264    UserGroupInformation ugi;
265    if (user.isPresent()) {
266      ugi = user.get().getUGI();
267    } else {
268      ugi = User.getCurrent().getUGI();
269    }
270    TableDescriptor tableDescriptor = region.getTableDescriptor();
271    TableName table = tableDescriptor.getTableName();
272
273    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
274    try {
275      quota.checkBatchQuota(numWrites, numReads);
276    } catch (RpcThrottlingException e) {
277      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
278        + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage());
279      throw e;
280    }
281    return quota;
282  }
283}