001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.Optional; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.ipc.RpcScheduler; 031import org.apache.hadoop.hbase.ipc.RpcServer; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 033import org.apache.hadoop.hbase.regionserver.Region; 034import org.apache.hadoop.hbase.regionserver.RegionServerServices; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.security.UserGroupInformation; 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038 039/** 040 * Region Server Quota Manager. 041 * It is responsible to provide access to the quota information of each user/table. 042 * 043 * The direct user of this class is the RegionServer that will get and check the 044 * user/table quota for each operation (put, get, scan). 045 * For system tables and user/table with a quota specified, the quota check will be a noop. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Evolving 049public class RegionServerRpcQuotaManager { 050 private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); 051 052 private final RegionServerServices rsServices; 053 054 private QuotaCache quotaCache = null; 055 056 public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { 057 this.rsServices = rsServices; 058 } 059 060 public void start(final RpcScheduler rpcScheduler) throws IOException { 061 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 062 LOG.info("Quota support disabled"); 063 return; 064 } 065 066 LOG.info("Initializing RPC quota support"); 067 068 // Initialize quota cache 069 quotaCache = new QuotaCache(rsServices); 070 quotaCache.start(); 071 } 072 073 public void stop() { 074 if (isQuotaEnabled()) { 075 quotaCache.stop("shutdown"); 076 } 077 } 078 079 public boolean isQuotaEnabled() { 080 return quotaCache != null; 081 } 082 083 @VisibleForTesting 084 QuotaCache getQuotaCache() { 085 return quotaCache; 086 } 087 088 /** 089 * Returns the quota for an operation. 090 * 091 * @param ugi the user that is executing the operation 092 * @param table the table where the operation will be executed 093 * @return the OperationQuota 094 */ 095 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { 096 if (isQuotaEnabled() && !table.isSystemTable()) { 097 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); 098 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); 099 boolean useNoop = userLimiter.isBypass(); 100 if (userQuotaState.hasBypassGlobals()) { 101 if (LOG.isTraceEnabled()) { 102 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); 103 } 104 if (!useNoop) { 105 return new DefaultOperationQuota(userLimiter); 106 } 107 } else { 108 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); 109 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); 110 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass(); 111 if (LOG.isTraceEnabled()) { 112 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + 113 userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); 114 } 115 if (!useNoop) { 116 return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); 117 } 118 } 119 } 120 return NoopOperationQuota.get(); 121 } 122 123 /** 124 * Check the quota for the current (rpc-context) user. 125 * Returns the OperationQuota used to get the available quota and 126 * to report the data/usage of the operation. 127 * @param region the region where the operation will be performed 128 * @param type the operation type 129 * @return the OperationQuota 130 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 131 */ 132 public OperationQuota checkQuota(final Region region, 133 final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { 134 switch (type) { 135 case SCAN: return checkQuota(region, 0, 0, 1); 136 case GET: return checkQuota(region, 0, 1, 0); 137 case MUTATE: return checkQuota(region, 1, 0, 0); 138 } 139 throw new RuntimeException("Invalid operation type: " + type); 140 } 141 142 /** 143 * Check the quota for the current (rpc-context) user. 144 * Returns the OperationQuota used to get the available quota and 145 * to report the data/usage of the operation. 146 * @param region the region where the operation will be performed 147 * @param actions the "multi" actions to perform 148 * @return the OperationQuota 149 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 150 */ 151 public OperationQuota checkQuota(final Region region, 152 final List<ClientProtos.Action> actions) throws IOException, RpcThrottlingException { 153 int numWrites = 0; 154 int numReads = 0; 155 for (final ClientProtos.Action action: actions) { 156 if (action.hasMutation()) { 157 numWrites++; 158 } else if (action.hasGet()) { 159 numReads++; 160 } 161 } 162 return checkQuota(region, numWrites, numReads, 0); 163 } 164 165 /** 166 * Check the quota for the current (rpc-context) user. 167 * Returns the OperationQuota used to get the available quota and 168 * to report the data/usage of the operation. 169 * @param region the region where the operation will be performed 170 * @param numWrites number of writes to perform 171 * @param numReads number of short-reads to perform 172 * @param numScans number of scan to perform 173 * @return the OperationQuota 174 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 175 */ 176 private OperationQuota checkQuota(final Region region, 177 final int numWrites, final int numReads, final int numScans) 178 throws IOException, RpcThrottlingException { 179 Optional<User> user = RpcServer.getRequestUser(); 180 UserGroupInformation ugi; 181 if (user.isPresent()) { 182 ugi = user.get().getUGI(); 183 } else { 184 ugi = User.getCurrent().getUGI(); 185 } 186 TableName table = region.getTableDescriptor().getTableName(); 187 188 OperationQuota quota = getQuota(ugi, table); 189 try { 190 quota.checkQuota(numWrites, numReads, numScans); 191 } catch (RpcThrottlingException e) { 192 LOG.debug("Throttling exception for user=" + ugi.getUserName() + 193 " table=" + table + " numWrites=" + numWrites + 194 " numReads=" + numReads + " numScans=" + numScans + 195 ": " + e.getMessage()); 196 throw e; 197 } 198 return quota; 199 } 200}