001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import java.util.concurrent.TimeUnit;
024import java.util.function.Supplier;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.TableDescriptor;
028import org.apache.hadoop.hbase.conf.ConfigurationObserver;
029import org.apache.hadoop.hbase.ipc.RpcScheduler;
030import org.apache.hadoop.hbase.ipc.RpcServer;
031import org.apache.hadoop.hbase.regionserver.Region;
032import org.apache.hadoop.hbase.regionserver.RegionServerServices;
033import org.apache.hadoop.hbase.security.User;
034import org.apache.hadoop.security.UserGroupInformation;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.yetus.audience.InterfaceStability;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
043
044/**
045 * Region Server Quota Manager. It is responsible to provide access to the quota information of each
046 * user/table. The direct user of this class is the RegionServer that will get and check the
047 * user/table quota for each operation (put, get, scan). For system tables and user/table with a
048 * quota specified, the quota check will be a noop.
049 */
050@InterfaceAudience.Private
051@InterfaceStability.Evolving
052public class RegionServerRpcQuotaManager implements RpcQuotaManager, ConfigurationObserver {
053  private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
054
055  private final RegionServerServices rsServices;
056
057  private QuotaCache quotaCache = null;
058  private volatile boolean rpcThrottleEnabled;
059  // Storage for quota rpc throttle
060  private RpcThrottleStorage rpcThrottleStorage;
061  private final Supplier<Double> requestsPerSecondSupplier;
062
063  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
064    this.rsServices = rsServices;
065    rpcThrottleStorage =
066      new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
067    this.requestsPerSecondSupplier = Suppliers.memoizeWithExpiration(
068      () -> rsServices.getMetrics().getRegionServerWrapper().getRequestsPerSecond(), 1,
069      TimeUnit.MINUTES);
070  }
071
072  public void start(final RpcScheduler rpcScheduler) throws IOException {
073    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
074      LOG.info("Quota support disabled");
075      return;
076    }
077
078    LOG.info("Initializing RPC quota support");
079
080    // Initialize quota cache
081    quotaCache = new QuotaCache(rsServices);
082    quotaCache.start();
083    rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
084    LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
085  }
086
087  public void stop() {
088    if (isQuotaEnabled()) {
089      quotaCache.stop("shutdown");
090    }
091  }
092
093  public void reload() {
094    if (isQuotaEnabled()) {
095      quotaCache.forceSynchronousCacheRefresh();
096    }
097  }
098
099  @Override
100  public void onConfigurationChange(Configuration conf) {
101    reload();
102  }
103
104  protected boolean isRpcThrottleEnabled() {
105    return rpcThrottleEnabled;
106  }
107
108  private boolean isQuotaEnabled() {
109    return quotaCache != null;
110  }
111
112  public void switchRpcThrottle(boolean enable) throws IOException {
113    if (isQuotaEnabled()) {
114      if (rpcThrottleEnabled != enable) {
115        boolean previousEnabled = rpcThrottleEnabled;
116        rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
117        LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
118      } else {
119        LOG.warn(
120          "Skip switch rpc throttle because previous value {} is the same as current value {}",
121          rpcThrottleEnabled, enable);
122      }
123    } else {
124      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
125    }
126  }
127
128  QuotaCache getQuotaCache() {
129    return quotaCache;
130  }
131
132  /**
133   * Returns the quota for an operation.
134   * @param ugi   the user that is executing the operation
135   * @param table the table where the operation will be executed
136   * @return the OperationQuota
137   */
138  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table,
139    final int blockSizeBytes) {
140    if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
141      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
142      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
143
144      boolean useNoop = userLimiter.isBypass();
145      if (userQuotaState.hasBypassGlobals()) {
146        if (LOG.isTraceEnabled()) {
147          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
148        }
149        if (!useNoop) {
150          return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
151            requestsPerSecondSupplier.get(), userLimiter);
152        }
153      } else {
154        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
155        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
156        QuotaLimiter rsLimiter =
157          quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
158        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
159        boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled();
160        if (LOG.isTraceEnabled()) {
161          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
162            + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
163            + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled);
164        }
165        if (!useNoop) {
166          if (exceedThrottleQuotaEnabled) {
167            return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
168              requestsPerSecondSupplier.get(), rsLimiter, userLimiter, tableLimiter, nsLimiter);
169          } else {
170            return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
171              requestsPerSecondSupplier.get(), userLimiter, tableLimiter, nsLimiter, rsLimiter);
172          }
173        }
174      }
175    }
176    return NoopOperationQuota.get();
177  }
178
179  @Override
180  public OperationQuota checkScanQuota(final Region region,
181    final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
182    long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
183    throws IOException, RpcThrottlingException {
184    Optional<User> user = RpcServer.getRequestUser();
185    UserGroupInformation ugi;
186    if (user.isPresent()) {
187      ugi = user.get().getUGI();
188    } else {
189      ugi = User.getCurrent().getUGI();
190    }
191    TableDescriptor tableDescriptor = region.getTableDescriptor();
192    TableName table = tableDescriptor.getTableName();
193
194    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
195    try {
196      quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
197        prevBlockBytesScannedDifference);
198    } catch (RpcThrottlingException e) {
199      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan="
200        + scanRequest.getScannerId() + ": " + e.getMessage());
201
202      rsServices.getMetrics().recordThrottleException(e.getType(), ugi.getShortUserName(),
203        table.getNameAsString());
204
205      throw e;
206    }
207    return quota;
208  }
209
210  @Override
211  public OperationQuota checkBatchQuota(final Region region,
212    final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
213    switch (type) {
214      case GET:
215        return this.checkBatchQuota(region, 0, 1, false);
216      case MUTATE:
217        return this.checkBatchQuota(region, 1, 0, false);
218      case CHECK_AND_MUTATE:
219        return this.checkBatchQuota(region, 1, 1, true);
220    }
221    throw new RuntimeException("Invalid operation type: " + type);
222  }
223
224  @Override
225  public OperationQuota checkBatchQuota(final Region region,
226    final List<ClientProtos.Action> actions, boolean hasCondition)
227    throws IOException, RpcThrottlingException {
228    int numWrites = 0;
229    int numReads = 0;
230    boolean isAtomic = false;
231    for (final ClientProtos.Action action : actions) {
232      if (action.hasMutation()) {
233        numWrites++;
234        OperationQuota.OperationType operationType =
235          QuotaUtil.getQuotaOperationType(action, hasCondition);
236        if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
237          numReads++;
238          // If any mutations in this batch are atomic, we will count the entire batch as atomic.
239          // This is a conservative approach, but it is the best that we can do without knowing
240          // the block bytes scanned of each individual action.
241          isAtomic = true;
242        }
243      } else if (action.hasGet()) {
244        numReads++;
245      }
246    }
247    return checkBatchQuota(region, numWrites, numReads, isAtomic);
248  }
249
250  /**
251   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
252   * available quota and to report the data/usage of the operation.
253   * @param region    the region where the operation will be performed
254   * @param numWrites number of writes to perform
255   * @param numReads  number of short-reads to perform
256   * @return the OperationQuota
257   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
258   */
259  @Override
260  public OperationQuota checkBatchQuota(final Region region, final int numWrites,
261    final int numReads, boolean isAtomic) throws IOException, RpcThrottlingException {
262    Optional<User> user = RpcServer.getRequestUser();
263    UserGroupInformation ugi;
264    if (user.isPresent()) {
265      ugi = user.get().getUGI();
266    } else {
267      ugi = User.getCurrent().getUGI();
268    }
269    TableDescriptor tableDescriptor = region.getTableDescriptor();
270    TableName table = tableDescriptor.getTableName();
271
272    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
273    try {
274      quota.checkBatchQuota(numWrites, numReads, isAtomic);
275    } catch (RpcThrottlingException e) {
276      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
277        + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage());
278
279      rsServices.getMetrics().recordThrottleException(e.getType(), ugi.getShortUserName(),
280        table.getNameAsString());
281
282      throw e;
283    }
284    return quota;
285  }
286}