001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets; 021import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets; 022import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts; 023import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans; 024import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; 025import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.util.concurrent.Callable; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.RegionServerTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.junit.jupiter.api.AfterAll; 041import org.junit.jupiter.api.AfterEach; 042import org.junit.jupiter.api.BeforeAll; 043import org.junit.jupiter.api.Tag; 044import org.junit.jupiter.api.Test; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048@Tag(RegionServerTests.TAG) 049@Tag(LargeTests.TAG) 050public class TestBlockBytesScannedQuota { 051 052 private final static Logger LOG = LoggerFactory.getLogger(TestBlockBytesScannedQuota.class); 053 054 private static final int REFRESH_TIME = 5000; 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 private static final byte[] FAMILY = Bytes.toBytes("cf"); 057 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 058 059 private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest"); 060 private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024; 061 062 @BeforeAll 063 public static void setUpBeforeClass() throws Exception { 064 // client should fail fast 065 TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); 066 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 067 TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 068 MAX_SCANNER_RESULT_SIZE); 069 TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, 070 AverageIntervalRateLimiter.class, RateLimiter.class); 071 072 // quotas enabled, using block bytes scanned 073 TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 074 TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); 075 076 // don't cache blocks to make IO predictable 077 TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 078 079 TEST_UTIL.startMiniCluster(1); 080 TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); 081 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 082 TEST_UTIL.waitTableAvailable(TABLE_NAME); 083 } 084 085 @AfterAll 086 public static void tearDownAfterClass() throws Exception { 087 EnvironmentEdgeManager.reset(); 088 TEST_UTIL.deleteTable(TABLE_NAME); 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 092 @AfterEach 093 public void tearDown() throws Exception { 094 ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); 095 } 096 097 @Test 098 public void testBBSGet() throws Exception { 099 final Admin admin = TEST_UTIL.getAdmin(); 100 final String userName = User.getCurrent().getShortName(); 101 int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); 102 Table table = admin.getConnection().getTable(TABLE_NAME); 103 104 doPuts(10_000, FAMILY, QUALIFIER, table); 105 TEST_UTIL.flush(TABLE_NAME); 106 107 // Add ~10 block/sec limit 108 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE, 109 Math.round(10.1 * blockSize), TimeUnit.SECONDS)); 110 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 111 112 // should execute at max 10 requests 113 testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1); 114 115 // wait a minute and you should get another 10 requests executed 116 waitMinuteQuota(); 117 testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1); 118 119 // Remove all the limits 120 admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); 121 triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 122 testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0); 123 testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0); 124 } 125 126 @Test 127 public void testBBSScan() throws Exception { 128 final Admin admin = TEST_UTIL.getAdmin(); 129 final String userName = User.getCurrent().getShortName(); 130 int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); 131 Table table = admin.getConnection().getTable(TABLE_NAME); 132 133 doPuts(10_000, FAMILY, QUALIFIER, table); 134 TEST_UTIL.flush(TABLE_NAME); 135 136 // Add 1 block/sec limit. 137 // This should only allow 1 scan per minute, because we estimate 1 block per scan 138 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, 139 TimeUnit.SECONDS)); 140 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 141 waitMinuteQuota(); 142 143 // should execute 1 request 144 testTraffic(() -> doScans(5, table, 1), 1, 0); 145 146 // Remove all the limits 147 admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); 148 triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 149 testTraffic(() -> doScans(100, table, 1), 100, 0); 150 testTraffic(() -> doScans(100, table, 1), 100, 0); 151 152 // Add ~3 block/sec limit. This should support >1 scans 153 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, 154 Math.round(3.1 * blockSize), TimeUnit.SECONDS)); 155 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 156 waitMinuteQuota(); 157 158 // Add 50 block/sec limit. This should support >1 scans 159 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, 160 Math.round(50.1 * blockSize), TimeUnit.SECONDS)); 161 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 162 waitMinuteQuota(); 163 164 // This will produce some throttling exceptions, but all/most should succeed within the timeout 165 testTraffic(() -> doScans(100, table, 1), 75, 25); 166 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 167 waitMinuteQuota(); 168 169 // With large caching, a big scan should succeed 170 testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0); 171 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 172 waitMinuteQuota(); 173 174 // Remove all the limits 175 admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); 176 triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 177 testTraffic(() -> doScans(100, table, 1), 100, 0); 178 testTraffic(() -> doScans(100, table, 1), 100, 0); 179 } 180 181 @Test 182 public void testSmallScanNeverBlockedByLargeEstimate() throws Exception { 183 final Admin admin = TEST_UTIL.getAdmin(); 184 final String userName = User.getCurrent().getShortName(); 185 Table table = admin.getConnection().getTable(TABLE_NAME); 186 187 doPuts(10_000, FAMILY, QUALIFIER, table); 188 TEST_UTIL.flush(TABLE_NAME); 189 190 // Add 99MB/sec limit. 191 // This should never be blocked, but with a sequence number approaching 10k, without 192 // other intervention, we would estimate a scan workload approaching 625MB or the 193 // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all 194 // requests succeed, so the estimate never becomes large enough to cause read downtime 195 long limit = 99 * 1024 * 1024; 196 assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code 197 // changes 198 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit, 199 TimeUnit.SECONDS)); 200 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 201 waitMinuteQuota(); 202 203 // should execute all requests 204 testTraffic(() -> doScans(10_000, table, 1), 10_000, 0); 205 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 206 waitMinuteQuota(); 207 208 // Remove all the limits 209 admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); 210 triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 211 testTraffic(() -> doScans(100, table, 1), 100, 0); 212 testTraffic(() -> doScans(100, table, 1), 100, 0); 213 } 214 215 @Test 216 public void testBBSMultiGet() throws Exception { 217 final Admin admin = TEST_UTIL.getAdmin(); 218 final String userName = User.getCurrent().getShortName(); 219 int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); 220 Table table = admin.getConnection().getTable(TABLE_NAME); 221 int rowCount = 10_000; 222 223 doPuts(rowCount, FAMILY, QUALIFIER, table); 224 TEST_UTIL.flush(TABLE_NAME); 225 226 // Add 1 block/sec limit. 227 // This should only allow 1 multiget per minute, because we estimate 1 block per multiget 228 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, 229 TimeUnit.SECONDS)); 230 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 231 waitMinuteQuota(); 232 233 // should execute 1 request 234 testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1); 235 236 // Remove all the limits 237 admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); 238 triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 239 testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); 240 testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); 241 242 // Add ~100 block/sec limit 243 admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, 244 Math.round(100.1 * blockSize), TimeUnit.SECONDS)); 245 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 246 247 // should execute approximately 10 batches of 10 requests 248 testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1); 249 250 // wait a minute and you should get another ~10 batches of 10 requests 251 waitMinuteQuota(); 252 testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1); 253 254 // Remove all the limits 255 admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); 256 triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 257 testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); 258 testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); 259 } 260 261 private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError) 262 throws Exception { 263 TEST_UTIL.waitFor(5_000, () -> { 264 long actualSuccess; 265 try { 266 actualSuccess = trafficCallable.call(); 267 } catch (Exception e) { 268 throw new RuntimeException(e); 269 } 270 LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess, 271 expectedSuccess, marginOfError); 272 boolean success = (actualSuccess >= expectedSuccess - marginOfError) 273 && (actualSuccess <= expectedSuccess + marginOfError); 274 if (!success) { 275 triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 276 waitMinuteQuota(); 277 } 278 return success; 279 }); 280 } 281}