001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.Optional; 024 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.ipc.RpcScheduler; 027import org.apache.hadoop.hbase.ipc.RpcServer; 028import org.apache.hadoop.hbase.regionserver.Region; 029import org.apache.hadoop.hbase.regionserver.RegionServerServices; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.security.UserGroupInformation; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.yetus.audience.InterfaceStability; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 038 039/** 040 * Region Server Quota Manager. 041 * It is responsible to provide access to the quota information of each user/table. 042 * 043 * The direct user of this class is the RegionServer that will get and check the 044 * user/table quota for each operation (put, get, scan). 045 * For system tables and user/table with a quota specified, the quota check will be a noop. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Evolving 049public class RegionServerRpcQuotaManager { 050 private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); 051 052 private final RegionServerServices rsServices; 053 054 private QuotaCache quotaCache = null; 055 private volatile boolean rpcThrottleEnabled; 056 // Storage for quota rpc throttle 057 private RpcThrottleStorage rpcThrottleStorage; 058 059 public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { 060 this.rsServices = rsServices; 061 rpcThrottleStorage = 062 new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); 063 } 064 065 public void start(final RpcScheduler rpcScheduler) throws IOException { 066 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 067 LOG.info("Quota support disabled"); 068 return; 069 } 070 071 LOG.info("Initializing RPC quota support"); 072 073 // Initialize quota cache 074 quotaCache = new QuotaCache(rsServices); 075 quotaCache.start(); 076 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 077 LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled); 078 } 079 080 public void stop() { 081 if (isQuotaEnabled()) { 082 quotaCache.stop("shutdown"); 083 } 084 } 085 086 @VisibleForTesting 087 protected boolean isRpcThrottleEnabled() { 088 return rpcThrottleEnabled; 089 } 090 091 private boolean isQuotaEnabled() { 092 return quotaCache != null; 093 } 094 095 public void switchRpcThrottle(boolean enable) throws IOException { 096 if (isQuotaEnabled()) { 097 if (rpcThrottleEnabled != enable) { 098 boolean previousEnabled = rpcThrottleEnabled; 099 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 100 LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled); 101 } else { 102 LOG.warn( 103 "Skip switch rpc throttle because previous value {} is the same as current value {}", 104 rpcThrottleEnabled, enable); 105 } 106 } else { 107 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable); 108 } 109 } 110 111 @VisibleForTesting 112 QuotaCache getQuotaCache() { 113 return quotaCache; 114 } 115 116 /** 117 * Returns the quota for an operation. 118 * 119 * @param ugi the user that is executing the operation 120 * @param table the table where the operation will be executed 121 * @return the OperationQuota 122 */ 123 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { 124 if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { 125 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); 126 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); 127 boolean useNoop = userLimiter.isBypass(); 128 if (userQuotaState.hasBypassGlobals()) { 129 if (LOG.isTraceEnabled()) { 130 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); 131 } 132 if (!useNoop) { 133 return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter); 134 } 135 } else { 136 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); 137 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); 138 QuotaLimiter rsLimiter = quotaCache 139 .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY); 140 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass(); 141 boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled(); 142 if (LOG.isTraceEnabled()) { 143 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter 144 + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter=" 145 + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled); 146 } 147 if (!useNoop) { 148 if (exceedThrottleQuotaEnabled) { 149 return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter, 150 userLimiter, tableLimiter, nsLimiter); 151 } else { 152 return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter, 153 tableLimiter, nsLimiter, rsLimiter); 154 } 155 } 156 } 157 } 158 return NoopOperationQuota.get(); 159 } 160 161 /** 162 * Check the quota for the current (rpc-context) user. 163 * Returns the OperationQuota used to get the available quota and 164 * to report the data/usage of the operation. 165 * @param region the region where the operation will be performed 166 * @param type the operation type 167 * @return the OperationQuota 168 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 169 */ 170 public OperationQuota checkQuota(final Region region, 171 final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { 172 switch (type) { 173 case SCAN: return checkQuota(region, 0, 0, 1); 174 case GET: return checkQuota(region, 0, 1, 0); 175 case MUTATE: return checkQuota(region, 1, 0, 0); 176 } 177 throw new RuntimeException("Invalid operation type: " + type); 178 } 179 180 /** 181 * Check the quota for the current (rpc-context) user. 182 * Returns the OperationQuota used to get the available quota and 183 * to report the data/usage of the operation. 184 * @param region the region where the operation will be performed 185 * @param actions the "multi" actions to perform 186 * @return the OperationQuota 187 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 188 */ 189 public OperationQuota checkQuota(final Region region, 190 final List<ClientProtos.Action> actions) throws IOException, RpcThrottlingException { 191 int numWrites = 0; 192 int numReads = 0; 193 for (final ClientProtos.Action action: actions) { 194 if (action.hasMutation()) { 195 numWrites++; 196 } else if (action.hasGet()) { 197 numReads++; 198 } 199 } 200 return checkQuota(region, numWrites, numReads, 0); 201 } 202 203 /** 204 * Check the quota for the current (rpc-context) user. 205 * Returns the OperationQuota used to get the available quota and 206 * to report the data/usage of the operation. 207 * @param region the region where the operation will be performed 208 * @param numWrites number of writes to perform 209 * @param numReads number of short-reads to perform 210 * @param numScans number of scan to perform 211 * @return the OperationQuota 212 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 213 */ 214 private OperationQuota checkQuota(final Region region, 215 final int numWrites, final int numReads, final int numScans) 216 throws IOException, RpcThrottlingException { 217 Optional<User> user = RpcServer.getRequestUser(); 218 UserGroupInformation ugi; 219 if (user.isPresent()) { 220 ugi = user.get().getUGI(); 221 } else { 222 ugi = User.getCurrent().getUGI(); 223 } 224 TableName table = region.getTableDescriptor().getTableName(); 225 226 OperationQuota quota = getQuota(ugi, table); 227 try { 228 quota.checkQuota(numWrites, numReads, numScans); 229 } catch (RpcThrottlingException e) { 230 LOG.debug("Throttling exception for user=" + ugi.getUserName() + 231 " table=" + table + " numWrites=" + numWrites + 232 " numReads=" + numReads + " numScans=" + numScans + 233 ": " + e.getMessage()); 234 throw e; 235 } 236 return quota; 237 } 238}