001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.testclassification.RegionServerTests; 028import org.apache.hadoop.hbase.testclassification.SmallTests; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 031import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 032import org.junit.ClassRule; 033import org.junit.Test; 034import org.junit.experimental.categories.Category; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 038 039@Category({ RegionServerTests.class, SmallTests.class }) 040public class TestDefaultOperationQuota { 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); 044 045 private static final Configuration conf = HBaseConfiguration.create(); 046 private static final int DEFAULT_REQUESTS_PER_SECOND = 1000; 047 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 048 static { 049 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 050 // only active the envEdge for quotas package 051 EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, 052 ThrottleQuotaTestUtil.class.getPackage().getName()); 053 } 054 055 @Test 056 public void testScanEstimateNewScanner() { 057 long blockSize = 64 * 1024; 058 long nextCallSeq = 0; 059 long maxScannerResultSize = 100 * 1024 * 1024; 060 long maxBlockBytesScanned = 0; 061 long prevBBSDifference = 0; 062 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 063 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 064 065 // new scanner should estimate scan read as 1 block 066 assertEquals(blockSize, estimate); 067 } 068 069 @Test 070 public void testScanEstimateSecondNextCall() { 071 long blockSize = 64 * 1024; 072 long nextCallSeq = 1; 073 long maxScannerResultSize = 100 * 1024 * 1024; 074 long maxBlockBytesScanned = 10 * blockSize; 075 long prevBBSDifference = 10 * blockSize; 076 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 077 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 078 079 // 2nd next call should be estimated at maxBBS 080 assertEquals(maxBlockBytesScanned, estimate); 081 } 082 083 @Test 084 public void testScanEstimateFlatWorkload() { 085 long blockSize = 64 * 1024; 086 long nextCallSeq = 100; 087 long maxScannerResultSize = 100 * 1024 * 1024; 088 long maxBlockBytesScanned = 10 * blockSize; 089 long prevBBSDifference = 0; 090 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 091 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 092 093 // flat workload should not overestimate 094 assertEquals(maxBlockBytesScanned, estimate); 095 } 096 097 @Test 098 public void testScanEstimateVariableFlatWorkload() { 099 long blockSize = 64 * 1024; 100 long nextCallSeq = 1; 101 long maxScannerResultSize = 100 * 1024 * 1024; 102 long maxBlockBytesScanned = 10 * blockSize; 103 long prevBBSDifference = 0; 104 for (int i = 0; i < 100; i++) { 105 long variation = Math.round(Math.random() * blockSize); 106 if (variation % 2 == 0) { 107 variation *= -1; 108 } 109 // despite +/- <1 block variation, we consider this workload flat 110 prevBBSDifference = variation; 111 112 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i, 113 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 114 115 // flat workload should not overestimate 116 assertEquals(maxBlockBytesScanned, estimate); 117 } 118 } 119 120 @Test 121 public void testScanEstimateGrowingWorkload() { 122 long blockSize = 64 * 1024; 123 long nextCallSeq = 100; 124 long maxScannerResultSize = 100 * 1024 * 1024; 125 long maxBlockBytesScanned = 20 * blockSize; 126 long prevBBSDifference = 10 * blockSize; 127 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 128 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 129 130 // growing workload should overestimate 131 assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate); 132 } 133 134 @Test 135 public void testScanEstimateShrinkingWorkload() { 136 long blockSize = 64 * 1024; 137 long nextCallSeq = 100; 138 long maxScannerResultSize = 100 * 1024 * 1024; 139 long maxBlockBytesScanned = 20 * blockSize; 140 long prevBBSDifference = -10 * blockSize; 141 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 142 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 143 144 // shrinking workload should only shrink estimate to maxBBS 145 assertEquals(maxBlockBytesScanned, estimate); 146 } 147 148 @Test 149 public void testLargeBatchSaturatesReadNumLimit() 150 throws RpcThrottlingException, InterruptedException { 151 int limit = 10; 152 QuotaProtos.Throttle throttle = 153 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 154 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 155 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 156 DefaultOperationQuota quota = 157 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 158 159 // use the whole limit 160 quota.checkBatchQuota(0, limit, false); 161 162 // the next request should be rejected 163 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 164 165 envEdge.incValue(1000); 166 // after the TimeUnit, the limit should be refilled 167 quota.checkBatchQuota(0, limit, false); 168 } 169 170 @Test 171 public void testLargeBatchSaturatesReadWriteLimit() 172 throws RpcThrottlingException, InterruptedException { 173 int limit = 10; 174 QuotaProtos.Throttle throttle = 175 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 176 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 177 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 178 DefaultOperationQuota quota = 179 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 180 181 // use the whole limit 182 quota.checkBatchQuota(limit, 0, false); 183 184 // the next request should be rejected 185 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 186 187 envEdge.incValue(1000); 188 // after the TimeUnit, the limit should be refilled 189 quota.checkBatchQuota(limit, 0, false); 190 } 191 192 @Test 193 public void testTooLargeReadBatchIsNotBlocked() 194 throws RpcThrottlingException, InterruptedException { 195 int limit = 10; 196 QuotaProtos.Throttle throttle = 197 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 198 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 199 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 200 DefaultOperationQuota quota = 201 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 202 203 // use more than the limit, which should succeed rather than being indefinitely blocked 204 quota.checkBatchQuota(0, 10 + limit, false); 205 206 // the next request should be blocked 207 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 208 209 envEdge.incValue(1000); 210 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 211 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false)); 212 } 213 214 @Test 215 public void testTooLargeWriteBatchIsNotBlocked() 216 throws RpcThrottlingException, InterruptedException { 217 int limit = 10; 218 QuotaProtos.Throttle throttle = 219 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 220 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 221 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 222 DefaultOperationQuota quota = 223 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 224 225 // use more than the limit, which should succeed rather than being indefinitely blocked 226 quota.checkBatchQuota(10 + limit, 0, false); 227 228 // the next request should be blocked 229 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 230 231 envEdge.incValue(1000); 232 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 233 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); 234 } 235 236 @Test 237 public void testTooLargeWriteSizeIsNotBlocked() 238 throws RpcThrottlingException, InterruptedException { 239 int limit = 50; 240 QuotaProtos.Throttle throttle = 241 QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() 242 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 243 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 244 DefaultOperationQuota quota = 245 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 246 247 // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked 248 quota.checkBatchQuota(1, 0, false); 249 250 // the next request should be blocked 251 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 252 253 envEdge.incValue(1000); 254 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 255 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); 256 } 257 258 @Test 259 public void testTooLargeReadSizeIsNotBlocked() 260 throws RpcThrottlingException, InterruptedException { 261 long blockSize = 65536; 262 long limit = blockSize / 2; 263 QuotaProtos.Throttle throttle = 264 QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() 265 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 266 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 267 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, 268 DEFAULT_REQUESTS_PER_SECOND, limiter); 269 270 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 271 quota.checkBatchQuota(0, 1, false); 272 273 // the next request should be blocked 274 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 275 276 envEdge.incValue(1000); 277 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 278 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); 279 } 280 281 @Test 282 public void testTooLargeRequestSizeIsNotBlocked() 283 throws RpcThrottlingException, InterruptedException { 284 long blockSize = 65536; 285 long limit = blockSize / 2; 286 QuotaProtos.Throttle throttle = 287 QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() 288 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 289 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 290 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, 291 DEFAULT_REQUESTS_PER_SECOND, limiter); 292 293 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 294 quota.checkBatchQuota(0, 1, false); 295 296 // the next request should be blocked 297 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 298 299 envEdge.incValue(1000); 300 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 301 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); 302 } 303}