001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Optional; 023import java.util.concurrent.TimeUnit; 024import java.util.function.Supplier; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.TableDescriptor; 028import org.apache.hadoop.hbase.conf.ConfigurationObserver; 029import org.apache.hadoop.hbase.ipc.RpcScheduler; 030import org.apache.hadoop.hbase.ipc.RpcServer; 031import org.apache.hadoop.hbase.regionserver.Region; 032import org.apache.hadoop.hbase.regionserver.RegionServerServices; 033import org.apache.hadoop.hbase.security.User; 034import org.apache.hadoop.security.UserGroupInformation; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.yetus.audience.InterfaceStability; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 043 044/** 045 * Region Server Quota Manager. It is responsible to provide access to the quota information of each 046 * user/table. The direct user of this class is the RegionServer that will get and check the 047 * user/table quota for each operation (put, get, scan). For system tables and user/table with a 048 * quota specified, the quota check will be a noop. 049 */ 050@InterfaceAudience.Private 051@InterfaceStability.Evolving 052public class RegionServerRpcQuotaManager implements RpcQuotaManager, ConfigurationObserver { 053 private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); 054 055 private final RegionServerServices rsServices; 056 057 private QuotaCache quotaCache = null; 058 private volatile boolean rpcThrottleEnabled; 059 // Storage for quota rpc throttle 060 private RpcThrottleStorage rpcThrottleStorage; 061 private final Supplier<Double> requestsPerSecondSupplier; 062 063 public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { 064 this.rsServices = rsServices; 065 rpcThrottleStorage = 066 new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); 067 this.requestsPerSecondSupplier = Suppliers.memoizeWithExpiration( 068 () -> rsServices.getMetrics().getRegionServerWrapper().getRequestsPerSecond(), 1, 069 TimeUnit.MINUTES); 070 } 071 072 public void start(final RpcScheduler rpcScheduler) throws IOException { 073 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 074 LOG.info("Quota support disabled"); 075 return; 076 } 077 078 LOG.info("Initializing RPC quota support"); 079 080 // Initialize quota cache 081 quotaCache = new QuotaCache(rsServices); 082 quotaCache.start(); 083 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 084 LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled); 085 } 086 087 public void stop() { 088 if (isQuotaEnabled()) { 089 quotaCache.stop("shutdown"); 090 } 091 } 092 093 public void reload() { 094 if (isQuotaEnabled()) { 095 quotaCache.forceSynchronousCacheRefresh(); 096 } 097 } 098 099 @Override 100 public void onConfigurationChange(Configuration conf) { 101 reload(); 102 } 103 104 protected boolean isRpcThrottleEnabled() { 105 return rpcThrottleEnabled; 106 } 107 108 private boolean isQuotaEnabled() { 109 return quotaCache != null; 110 } 111 112 public void switchRpcThrottle(boolean enable) throws IOException { 113 if (isQuotaEnabled()) { 114 if (rpcThrottleEnabled != enable) { 115 boolean previousEnabled = rpcThrottleEnabled; 116 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 117 LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled); 118 } else { 119 LOG.warn( 120 "Skip switch rpc throttle because previous value {} is the same as current value {}", 121 rpcThrottleEnabled, enable); 122 } 123 } else { 124 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable); 125 } 126 } 127 128 QuotaCache getQuotaCache() { 129 return quotaCache; 130 } 131 132 /** 133 * Returns the quota for an operation. 134 * @param ugi the user that is executing the operation 135 * @param table the table where the operation will be executed 136 * @return the OperationQuota 137 */ 138 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table, 139 final int blockSizeBytes) { 140 if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { 141 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); 142 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); 143 144 boolean useNoop = userLimiter.isBypass(); 145 if (userQuotaState.hasBypassGlobals()) { 146 if (LOG.isTraceEnabled()) { 147 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); 148 } 149 if (!useNoop) { 150 return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 151 requestsPerSecondSupplier.get(), userLimiter); 152 } 153 } else { 154 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); 155 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); 156 QuotaLimiter rsLimiter = 157 quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY); 158 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass(); 159 boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled(); 160 if (LOG.isTraceEnabled()) { 161 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter 162 + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter=" 163 + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled); 164 } 165 if (!useNoop) { 166 if (exceedThrottleQuotaEnabled) { 167 return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 168 requestsPerSecondSupplier.get(), rsLimiter, userLimiter, tableLimiter, nsLimiter); 169 } else { 170 return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 171 requestsPerSecondSupplier.get(), userLimiter, tableLimiter, nsLimiter, rsLimiter); 172 } 173 } 174 } 175 } 176 return NoopOperationQuota.get(); 177 } 178 179 @Override 180 public OperationQuota checkScanQuota(final Region region, 181 final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 182 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) 183 throws IOException, RpcThrottlingException { 184 Optional<User> user = RpcServer.getRequestUser(); 185 UserGroupInformation ugi; 186 if (user.isPresent()) { 187 ugi = user.get().getUGI(); 188 } else { 189 ugi = User.getCurrent().getUGI(); 190 } 191 TableDescriptor tableDescriptor = region.getTableDescriptor(); 192 TableName table = tableDescriptor.getTableName(); 193 194 OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); 195 try { 196 quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 197 prevBlockBytesScannedDifference); 198 } catch (RpcThrottlingException e) { 199 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" 200 + scanRequest.getScannerId() + ": " + e.getMessage()); 201 202 rsServices.getMetrics().recordThrottleException(e.getType(), ugi.getShortUserName(), 203 table.getNameAsString()); 204 205 throw e; 206 } 207 return quota; 208 } 209 210 @Override 211 public OperationQuota checkBatchQuota(final Region region, 212 final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { 213 switch (type) { 214 case GET: 215 return this.checkBatchQuota(region, 0, 1, false); 216 case MUTATE: 217 return this.checkBatchQuota(region, 1, 0, false); 218 case CHECK_AND_MUTATE: 219 return this.checkBatchQuota(region, 1, 1, true); 220 } 221 throw new RuntimeException("Invalid operation type: " + type); 222 } 223 224 @Override 225 public OperationQuota checkBatchQuota(final Region region, 226 final List<ClientProtos.Action> actions, boolean hasCondition) 227 throws IOException, RpcThrottlingException { 228 int numWrites = 0; 229 int numReads = 0; 230 boolean isAtomic = false; 231 for (final ClientProtos.Action action : actions) { 232 if (action.hasMutation()) { 233 numWrites++; 234 OperationQuota.OperationType operationType = 235 QuotaUtil.getQuotaOperationType(action, hasCondition); 236 if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) { 237 numReads++; 238 // If any mutations in this batch are atomic, we will count the entire batch as atomic. 239 // This is a conservative approach, but it is the best that we can do without knowing 240 // the block bytes scanned of each individual action. 241 isAtomic = true; 242 } 243 } else if (action.hasGet()) { 244 numReads++; 245 } 246 } 247 return checkBatchQuota(region, numWrites, numReads, isAtomic); 248 } 249 250 /** 251 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 252 * available quota and to report the data/usage of the operation. 253 * @param region the region where the operation will be performed 254 * @param numWrites number of writes to perform 255 * @param numReads number of short-reads to perform 256 * @return the OperationQuota 257 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 258 */ 259 @Override 260 public OperationQuota checkBatchQuota(final Region region, final int numWrites, 261 final int numReads, boolean isAtomic) throws IOException, RpcThrottlingException { 262 Optional<User> user = RpcServer.getRequestUser(); 263 UserGroupInformation ugi; 264 if (user.isPresent()) { 265 ugi = user.get().getUGI(); 266 } else { 267 ugi = User.getCurrent().getUGI(); 268 } 269 TableDescriptor tableDescriptor = region.getTableDescriptor(); 270 TableName table = tableDescriptor.getTableName(); 271 272 OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); 273 try { 274 quota.checkBatchQuota(numWrites, numReads, isAtomic); 275 } catch (RpcThrottlingException e) { 276 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table 277 + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); 278 279 rsServices.getMetrics().recordThrottleException(e.getType(), ugi.getShortUserName(), 280 table.getNameAsString()); 281 282 throw e; 283 } 284 return quota; 285 } 286}