001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Objects; 022import org.apache.hadoop.hbase.HBaseTestingUtility; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.client.Get; 025import org.apache.hadoop.hbase.client.Put; 026import org.apache.hadoop.hbase.client.Table; 027import org.apache.hadoop.hbase.security.User; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 031import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 032import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037@InterfaceAudience.Private 038public final class ThrottleQuotaTestUtil { 039 040 private final static Logger LOG = LoggerFactory.getLogger(ThrottleQuotaTestUtil.class); 041 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 042 private final static int REFRESH_TIME = 30 * 60000; 043 static { 044 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 045 EnvironmentEdgeManagerTestHelper.injectEdge(envEdge); 046 } 047 048 private ThrottleQuotaTestUtil() { 049 // Hide utility class constructor 050 LOG.debug("Call constructor of ThrottleQuotaTestUtil"); 051 } 052 053 static int doPuts(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { 054 return doPuts(maxOps, -1, family, qualifier, tables); 055 } 056 057 static int doPuts(int maxOps, int valueSize, byte[] family, byte[] qualifier, 058 final Table... tables) { 059 int count = 0; 060 try { 061 while (count < maxOps) { 062 Put put = new Put(Bytes.toBytes("row-" + count)); 063 byte[] value; 064 if (valueSize < 0) { 065 value = Bytes.toBytes("data-" + count); 066 } else { 067 value = generateValue(valueSize); 068 } 069 put.addColumn(family, qualifier, value); 070 for (final Table table : tables) { 071 table.put(put); 072 } 073 count += tables.length; 074 } 075 } catch (IOException e) { 076 LOG.error("put failed after nRetries=" + count, e); 077 } 078 return count; 079 } 080 081 private static byte[] generateValue(int valueSize) { 082 byte[] bytes = new byte[valueSize]; 083 for (int i = 0; i < valueSize; i++) { 084 bytes[i] = 'a'; 085 } 086 return bytes; 087 } 088 089 static long doGets(int maxOps, final Table... tables) { 090 int count = 0; 091 try { 092 while (count < maxOps) { 093 Get get = new Get(Bytes.toBytes("row-" + count)); 094 for (final Table table : tables) { 095 table.get(get); 096 } 097 count += tables.length; 098 } 099 } catch (IOException e) { 100 LOG.error("get failed after nRetries=" + count, e); 101 } 102 return count; 103 } 104 105 static void triggerUserCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 106 TableName... tables) throws Exception { 107 triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables); 108 } 109 110 static void triggerTableCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 111 TableName... tables) throws Exception { 112 triggerCacheRefresh(testUtil, bypass, false, true, false, false, false, tables); 113 } 114 115 static void triggerNamespaceCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 116 TableName... tables) throws Exception { 117 triggerCacheRefresh(testUtil, bypass, false, false, true, false, false, tables); 118 } 119 120 static void triggerRegionServerCacheRefresh(HBaseTestingUtility testUtil, boolean bypass) 121 throws Exception { 122 triggerCacheRefresh(testUtil, bypass, false, false, false, true, false); 123 } 124 125 static void triggerExceedThrottleQuotaCacheRefresh(HBaseTestingUtility testUtil, 126 boolean exceedEnabled) throws Exception { 127 triggerCacheRefresh(testUtil, exceedEnabled, false, false, false, false, true); 128 } 129 130 private static void triggerCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 131 boolean userLimiter, boolean tableLimiter, boolean nsLimiter, boolean rsLimiter, 132 boolean exceedThrottleQuota, final TableName... tables) throws Exception { 133 envEdge.incValue(2 * REFRESH_TIME); 134 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 135 RegionServerRpcQuotaManager quotaManager = 136 rst.getRegionServer().getRegionServerRpcQuotaManager(); 137 QuotaCache quotaCache = quotaManager.getQuotaCache(); 138 139 quotaCache.triggerCacheRefresh(); 140 // sleep for cache update 141 Thread.sleep(250); 142 143 for (TableName table : tables) { 144 quotaCache.getTableLimiter(table); 145 } 146 147 boolean isUpdated = false; 148 while (!isUpdated) { 149 quotaCache.triggerCacheRefresh(); 150 isUpdated = true; 151 for (TableName table : tables) { 152 boolean isBypass = true; 153 if (userLimiter) { 154 isBypass = quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass(); 155 } 156 if (tableLimiter) { 157 isBypass &= quotaCache.getTableLimiter(table).isBypass(); 158 } 159 if (nsLimiter) { 160 isBypass &= quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass(); 161 } 162 if (isBypass != bypass) { 163 envEdge.incValue(100); 164 isUpdated = false; 165 break; 166 } 167 } 168 if (rsLimiter) { 169 boolean rsIsBypass = quotaCache 170 .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass(); 171 if (rsIsBypass != bypass) { 172 envEdge.incValue(100); 173 isUpdated = false; 174 } 175 } 176 if (exceedThrottleQuota) { 177 if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) { 178 envEdge.incValue(100); 179 isUpdated = false; 180 } 181 } 182 } 183 184 LOG.debug("QuotaCache"); 185 LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache())); 186 LOG.debug(Objects.toString(quotaCache.getTableQuotaCache())); 187 LOG.debug(Objects.toString(quotaCache.getUserQuotaCache())); 188 LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache())); 189 } 190 } 191 192 static void waitMinuteQuota() { 193 envEdge.incValue(70000); 194 } 195 196 static void clearQuotaCache(HBaseTestingUtility testUtil) { 197 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 198 RegionServerRpcQuotaManager quotaManager = 199 rst.getRegionServer().getRegionServerRpcQuotaManager(); 200 QuotaCache quotaCache = quotaManager.getQuotaCache(); 201 quotaCache.getNamespaceQuotaCache().clear(); 202 quotaCache.getTableQuotaCache().clear(); 203 quotaCache.getUserQuotaCache().clear(); 204 quotaCache.getRegionServerQuotaCache().clear(); 205 } 206 } 207}