001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Objects; 025import java.util.Random; 026import java.util.Set; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Increment; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.ResultScanner; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@InterfaceAudience.Private 047public final class ThrottleQuotaTestUtil { 048 049 private final static Logger LOG = LoggerFactory.getLogger(ThrottleQuotaTestUtil.class); 050 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 051 private final static int REFRESH_TIME = 30 * 60000; 052 static { 053 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 054 // only active the envEdge for quotas package 055 EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, 056 ThrottleQuotaTestUtil.class.getPackage().getName()); 057 } 058 059 private ThrottleQuotaTestUtil() { 060 // Hide utility class constructor 061 LOG.debug("Call constructor of ThrottleQuotaTestUtil"); 062 } 063 064 static int doPuts(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { 065 return doPuts(maxOps, -1, family, qualifier, tables); 066 } 067 068 static int doPuts(int maxOps, int valueSize, byte[] family, byte[] qualifier, 069 final Table... tables) { 070 int count = 0; 071 try { 072 while (count < maxOps) { 073 Put put = new Put(Bytes.toBytes("row-" + count)); 074 byte[] value; 075 if (valueSize < 0) { 076 value = Bytes.toBytes("data-" + count); 077 } else { 078 value = generateValue(valueSize); 079 } 080 put.addColumn(family, qualifier, value); 081 for (final Table table : tables) { 082 table.put(put); 083 } 084 count += tables.length; 085 } 086 } catch (IOException e) { 087 LOG.error("put failed after nRetries=" + count, e); 088 } 089 return count; 090 } 091 092 private static byte[] generateValue(int valueSize) { 093 byte[] bytes = new byte[valueSize]; 094 for (int i = 0; i < valueSize; i++) { 095 bytes[i] = 'a'; 096 } 097 return bytes; 098 } 099 100 static long doGets(int maxOps, final Table... tables) { 101 int count = 0; 102 try { 103 while (count < maxOps) { 104 Get get = new Get(Bytes.toBytes("row-" + count)); 105 for (final Table table : tables) { 106 table.get(get); 107 } 108 count += tables.length; 109 } 110 } catch (IOException e) { 111 LOG.error("get failed after nRetries=" + count, e); 112 } 113 return count; 114 } 115 116 static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { 117 int count = 0; 118 try { 119 while (count < maxOps) { 120 Get get = new Get(Bytes.toBytes("row-" + count)); 121 get.addColumn(family, qualifier); 122 for (final Table table : tables) { 123 table.get(get); 124 } 125 count += tables.length; 126 } 127 } catch (IOException e) { 128 LOG.error("get failed after nRetries=" + count, e); 129 } 130 return count; 131 } 132 133 static long doIncrements(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { 134 int count = 0; 135 try { 136 while (count < maxOps) { 137 Increment inc = new Increment(Bytes.toBytes("row-" + count)); 138 inc.addColumn(family, qualifier, 1L); 139 for (final Table table : tables) { 140 table.increment(inc); 141 } 142 count += tables.length; 143 } 144 } catch (IOException e) { 145 LOG.error("increment failed after nRetries=" + count, e); 146 } 147 return count; 148 } 149 150 static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier, 151 final Table... tables) { 152 int opCount = 0; 153 Random random = new Random(); 154 try { 155 while (opCount < maxOps) { 156 List<Get> gets = new ArrayList<>(batchSize); 157 while (gets.size() < batchSize) { 158 Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount))); 159 get.addColumn(family, qualifier); 160 gets.add(get); 161 } 162 for (final Table table : tables) { 163 table.get(gets); 164 } 165 opCount += tables.length; 166 } 167 } catch (IOException e) { 168 LOG.error("multiget failed after nRetries=" + opCount, e); 169 } 170 return opCount; 171 } 172 173 static long doScans(int desiredRows, Table table, int caching) { 174 int count = 0; 175 try { 176 Scan scan = new Scan(); 177 scan.setCaching(caching); 178 scan.setCacheBlocks(false); 179 ResultScanner scanner = table.getScanner(scan); 180 while (count < desiredRows) { 181 scanner.next(); 182 count += 1; 183 } 184 } catch (IOException e) { 185 LOG.error("scan failed after nRetries=" + count, e); 186 } 187 return count; 188 } 189 190 static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean bypass, 191 TableName... tables) throws Exception { 192 triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables); 193 } 194 195 static void triggerTableCacheRefresh(HBaseTestingUtil testUtil, boolean bypass, 196 TableName... tables) throws Exception { 197 triggerCacheRefresh(testUtil, bypass, false, true, false, false, false, tables); 198 } 199 200 static void triggerNamespaceCacheRefresh(HBaseTestingUtil testUtil, boolean bypass, 201 TableName... tables) throws Exception { 202 triggerCacheRefresh(testUtil, bypass, false, false, true, false, false, tables); 203 } 204 205 static void triggerRegionServerCacheRefresh(HBaseTestingUtil testUtil, boolean bypass) 206 throws Exception { 207 triggerCacheRefresh(testUtil, bypass, false, false, false, true, false); 208 } 209 210 static void triggerExceedThrottleQuotaCacheRefresh(HBaseTestingUtil testUtil, 211 boolean exceedEnabled) throws Exception { 212 triggerCacheRefresh(testUtil, exceedEnabled, false, false, false, false, true); 213 } 214 215 private static void triggerCacheRefresh(HBaseTestingUtil testUtil, boolean bypass, 216 boolean userLimiter, boolean tableLimiter, boolean nsLimiter, boolean rsLimiter, 217 boolean exceedThrottleQuota, final TableName... tables) throws Exception { 218 envEdge.incValue(2 * REFRESH_TIME); 219 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 220 RegionServerRpcQuotaManager quotaManager = 221 rst.getRegionServer().getRegionServerRpcQuotaManager(); 222 QuotaCache quotaCache = quotaManager.getQuotaCache(); 223 quotaCache.forceSynchronousCacheRefresh(); 224 Thread.sleep(250); 225 testUtil.waitFor(60000, 250, new ExplainingPredicate<Exception>() { 226 227 @Override 228 public boolean evaluate() throws Exception { 229 boolean isUpdated = true; 230 for (TableName table : tables) { 231 if (userLimiter) { 232 boolean isUserBypass = 233 quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass(); 234 if (isUserBypass != bypass) { 235 LOG.info( 236 "User limiter for user={}, table={} not refreshed, bypass expected {}, actual {}", 237 User.getCurrent(), table, bypass, isUserBypass); 238 envEdge.incValue(100); 239 isUpdated = false; 240 break; 241 } 242 } 243 if (tableLimiter) { 244 boolean isTableBypass = quotaCache.getTableLimiter(table).isBypass(); 245 if (isTableBypass != bypass) { 246 LOG.info("Table limiter for table={} not refreshed, bypass expected {}, actual {}", 247 table, bypass, isTableBypass); 248 envEdge.incValue(100); 249 isUpdated = false; 250 break; 251 } 252 } 253 if (nsLimiter) { 254 boolean isNsBypass = 255 quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass(); 256 if (isNsBypass != bypass) { 257 LOG.info( 258 "Namespace limiter for namespace={} not refreshed, bypass expected {}, actual {}", 259 table.getNamespaceAsString(), bypass, isNsBypass); 260 envEdge.incValue(100); 261 isUpdated = false; 262 break; 263 } 264 } 265 } 266 if (rsLimiter) { 267 boolean rsIsBypass = quotaCache 268 .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass(); 269 if (rsIsBypass != bypass) { 270 LOG.info("RegionServer limiter not refreshed, bypass expected {}, actual {}", bypass, 271 rsIsBypass); 272 envEdge.incValue(100); 273 isUpdated = false; 274 } 275 } 276 if (exceedThrottleQuota) { 277 if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) { 278 LOG.info("ExceedThrottleQuotaEnabled not refreshed, bypass expected {}, actual {}", 279 bypass, quotaCache.isExceedThrottleQuotaEnabled()); 280 envEdge.incValue(100); 281 isUpdated = false; 282 } 283 } 284 if (isUpdated) { 285 return true; 286 } 287 quotaCache.triggerCacheRefresh(); 288 return false; 289 } 290 291 @Override 292 public String explainFailure() throws Exception { 293 return "Quota cache is still not refreshed"; 294 } 295 }); 296 297 LOG.debug("QuotaCache"); 298 LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache())); 299 LOG.debug(Objects.toString(quotaCache.getTableQuotaCache())); 300 LOG.debug(Objects.toString(quotaCache.getUserQuotaCache())); 301 LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache())); 302 } 303 } 304 305 static Set<QuotaCache> getQuotaCaches(HBaseTestingUtil testUtil) { 306 Set<QuotaCache> quotaCaches = new HashSet<>(); 307 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 308 RegionServerRpcQuotaManager quotaManager = 309 rst.getRegionServer().getRegionServerRpcQuotaManager(); 310 quotaCaches.add(quotaManager.getQuotaCache()); 311 } 312 return quotaCaches; 313 } 314 315 static void waitMinuteQuota() { 316 envEdge.incValue(70000); 317 } 318 319 static void clearQuotaCache(HBaseTestingUtil testUtil) { 320 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 321 RegionServerRpcQuotaManager quotaManager = 322 rst.getRegionServer().getRegionServerRpcQuotaManager(); 323 QuotaCache quotaCache = quotaManager.getQuotaCache(); 324 quotaCache.getNamespaceQuotaCache().clear(); 325 quotaCache.getTableQuotaCache().clear(); 326 quotaCache.getUserQuotaCache().clear(); 327 quotaCache.getRegionServerQuotaCache().clear(); 328 } 329 } 330}