001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Optional; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.ipc.RpcScheduler; 025import org.apache.hadoop.hbase.ipc.RpcServer; 026import org.apache.hadoop.hbase.regionserver.Region; 027import org.apache.hadoop.hbase.regionserver.RegionServerServices; 028import org.apache.hadoop.hbase.security.User; 029import org.apache.hadoop.security.UserGroupInformation; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.yetus.audience.InterfaceStability; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 036 037/** 038 * Region Server Quota Manager. It is responsible to provide access to the quota information of each 039 * user/table. The direct user of this class is the RegionServer that will get and check the 040 * user/table quota for each operation (put, get, scan). For system tables and user/table with a 041 * quota specified, the quota check will be a noop. 042 */ 043@InterfaceAudience.Private 044@InterfaceStability.Evolving 045public class RegionServerRpcQuotaManager { 046 private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); 047 048 private final RegionServerServices rsServices; 049 050 private QuotaCache quotaCache = null; 051 private volatile boolean rpcThrottleEnabled; 052 // Storage for quota rpc throttle 053 private RpcThrottleStorage rpcThrottleStorage; 054 055 public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { 056 this.rsServices = rsServices; 057 rpcThrottleStorage = 058 new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); 059 } 060 061 public void start(final RpcScheduler rpcScheduler) throws IOException { 062 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 063 LOG.info("Quota support disabled"); 064 return; 065 } 066 067 LOG.info("Initializing RPC quota support"); 068 069 // Initialize quota cache 070 quotaCache = new QuotaCache(rsServices); 071 quotaCache.start(); 072 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 073 LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled); 074 } 075 076 public void stop() { 077 if (isQuotaEnabled()) { 078 quotaCache.stop("shutdown"); 079 } 080 } 081 082 protected boolean isRpcThrottleEnabled() { 083 return rpcThrottleEnabled; 084 } 085 086 private boolean isQuotaEnabled() { 087 return quotaCache != null; 088 } 089 090 public void switchRpcThrottle(boolean enable) throws IOException { 091 if (isQuotaEnabled()) { 092 if (rpcThrottleEnabled != enable) { 093 boolean previousEnabled = rpcThrottleEnabled; 094 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 095 LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled); 096 } else { 097 LOG.warn( 098 "Skip switch rpc throttle because previous value {} is the same as current value {}", 099 rpcThrottleEnabled, enable); 100 } 101 } else { 102 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable); 103 } 104 } 105 106 QuotaCache getQuotaCache() { 107 return quotaCache; 108 } 109 110 /** 111 * Returns the quota for an operation. 112 * @param ugi the user that is executing the operation 113 * @param table the table where the operation will be executed 114 * @return the OperationQuota 115 */ 116 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { 117 if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { 118 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); 119 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); 120 boolean useNoop = userLimiter.isBypass(); 121 if (userQuotaState.hasBypassGlobals()) { 122 if (LOG.isTraceEnabled()) { 123 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); 124 } 125 if (!useNoop) { 126 return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter); 127 } 128 } else { 129 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); 130 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); 131 QuotaLimiter rsLimiter = 132 quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY); 133 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass(); 134 boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled(); 135 if (LOG.isTraceEnabled()) { 136 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter 137 + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter=" 138 + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled); 139 } 140 if (!useNoop) { 141 if (exceedThrottleQuotaEnabled) { 142 return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter, 143 userLimiter, tableLimiter, nsLimiter); 144 } else { 145 return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter, 146 tableLimiter, nsLimiter, rsLimiter); 147 } 148 } 149 } 150 } 151 return NoopOperationQuota.get(); 152 } 153 154 /** 155 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 156 * available quota and to report the data/usage of the operation. 157 * @param region the region where the operation will be performed 158 * @param type the operation type 159 * @return the OperationQuota 160 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 161 */ 162 public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type) 163 throws IOException, RpcThrottlingException { 164 switch (type) { 165 case SCAN: 166 return checkQuota(region, 0, 0, 1); 167 case GET: 168 return checkQuota(region, 0, 1, 0); 169 case MUTATE: 170 return checkQuota(region, 1, 0, 0); 171 } 172 throw new RuntimeException("Invalid operation type: " + type); 173 } 174 175 /** 176 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 177 * available quota and to report the data/usage of the operation. 178 * @param region the region where the operation will be performed 179 * @param actions the "multi" actions to perform 180 * @return the OperationQuota 181 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 182 */ 183 public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions) 184 throws IOException, RpcThrottlingException { 185 int numWrites = 0; 186 int numReads = 0; 187 for (final ClientProtos.Action action : actions) { 188 if (action.hasMutation()) { 189 numWrites++; 190 } else if (action.hasGet()) { 191 numReads++; 192 } 193 } 194 return checkQuota(region, numWrites, numReads, 0); 195 } 196 197 /** 198 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 199 * available quota and to report the data/usage of the operation. 200 * @param region the region where the operation will be performed 201 * @param numWrites number of writes to perform 202 * @param numReads number of short-reads to perform 203 * @param numScans number of scan to perform 204 * @return the OperationQuota 205 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 206 */ 207 private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads, 208 final int numScans) throws IOException, RpcThrottlingException { 209 Optional<User> user = RpcServer.getRequestUser(); 210 UserGroupInformation ugi; 211 if (user.isPresent()) { 212 ugi = user.get().getUGI(); 213 } else { 214 ugi = User.getCurrent().getUGI(); 215 } 216 TableName table = region.getTableDescriptor().getTableName(); 217 218 OperationQuota quota = getQuota(ugi, table); 219 try { 220 quota.checkQuota(numWrites, numReads, numScans); 221 } catch (RpcThrottlingException e) { 222 LOG.debug( 223 "Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" 224 + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": " + e.getMessage()); 225 throw e; 226 } 227 return quota; 228 } 229}