001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertThrows; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.testclassification.RegionServerTests; 027import org.apache.hadoop.hbase.testclassification.SmallTests; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 030import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 031import org.junit.jupiter.api.Tag; 032import org.junit.jupiter.api.Test; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 036 037@Tag(RegionServerTests.TAG) 038@Tag(SmallTests.TAG) 039public class TestDefaultOperationQuota { 040 041 private static final Configuration conf = HBaseConfiguration.create(); 042 private static final int DEFAULT_REQUESTS_PER_SECOND = 1000; 043 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 044 static { 045 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 046 // only active the envEdge for quotas package 047 EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, 048 ThrottleQuotaTestUtil.class.getPackage().getName()); 049 } 050 051 @Test 052 public void testScanEstimateNewScanner() { 053 long blockSize = 64 * 1024; 054 long nextCallSeq = 0; 055 long maxScannerResultSize = 100 * 1024 * 1024; 056 long maxBlockBytesScanned = 0; 057 long prevBBSDifference = 0; 058 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 059 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 060 061 // new scanner should estimate scan read as 1 block 062 assertEquals(blockSize, estimate); 063 } 064 065 @Test 066 public void testScanEstimateSecondNextCall() { 067 long blockSize = 64 * 1024; 068 long nextCallSeq = 1; 069 long maxScannerResultSize = 100 * 1024 * 1024; 070 long maxBlockBytesScanned = 10 * blockSize; 071 long prevBBSDifference = 10 * blockSize; 072 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 073 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 074 075 // 2nd next call should be estimated at maxBBS 076 assertEquals(maxBlockBytesScanned, estimate); 077 } 078 079 @Test 080 public void testScanEstimateFlatWorkload() { 081 long blockSize = 64 * 1024; 082 long nextCallSeq = 100; 083 long maxScannerResultSize = 100 * 1024 * 1024; 084 long maxBlockBytesScanned = 10 * blockSize; 085 long prevBBSDifference = 0; 086 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 087 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 088 089 // flat workload should not overestimate 090 assertEquals(maxBlockBytesScanned, estimate); 091 } 092 093 @Test 094 public void testScanEstimateVariableFlatWorkload() { 095 long blockSize = 64 * 1024; 096 long nextCallSeq = 1; 097 long maxScannerResultSize = 100 * 1024 * 1024; 098 long maxBlockBytesScanned = 10 * blockSize; 099 long prevBBSDifference = 0; 100 for (int i = 0; i < 100; i++) { 101 long variation = Math.round(Math.random() * blockSize); 102 if (variation % 2 == 0) { 103 variation *= -1; 104 } 105 // despite +/- <1 block variation, we consider this workload flat 106 prevBBSDifference = variation; 107 108 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i, 109 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 110 111 // flat workload should not overestimate 112 assertEquals(maxBlockBytesScanned, estimate); 113 } 114 } 115 116 @Test 117 public void testScanEstimateGrowingWorkload() { 118 long blockSize = 64 * 1024; 119 long nextCallSeq = 100; 120 long maxScannerResultSize = 100 * 1024 * 1024; 121 long maxBlockBytesScanned = 20 * blockSize; 122 long prevBBSDifference = 10 * blockSize; 123 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 124 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 125 126 // growing workload should overestimate 127 assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate); 128 } 129 130 @Test 131 public void testScanEstimateShrinkingWorkload() { 132 long blockSize = 64 * 1024; 133 long nextCallSeq = 100; 134 long maxScannerResultSize = 100 * 1024 * 1024; 135 long maxBlockBytesScanned = 20 * blockSize; 136 long prevBBSDifference = -10 * blockSize; 137 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 138 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 139 140 // shrinking workload should only shrink estimate to maxBBS 141 assertEquals(maxBlockBytesScanned, estimate); 142 } 143 144 @Test 145 public void testLargeBatchSaturatesReadNumLimit() 146 throws RpcThrottlingException, InterruptedException { 147 int limit = 10; 148 QuotaProtos.Throttle throttle = 149 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 150 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 151 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 152 DefaultOperationQuota quota = 153 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 154 155 // use the whole limit 156 quota.checkBatchQuota(0, limit, false); 157 158 // the next request should be rejected 159 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 160 161 envEdge.incValue(1000); 162 // after the TimeUnit, the limit should be refilled 163 quota.checkBatchQuota(0, limit, false); 164 } 165 166 @Test 167 public void testLargeBatchSaturatesReadWriteLimit() 168 throws RpcThrottlingException, InterruptedException { 169 int limit = 10; 170 QuotaProtos.Throttle throttle = 171 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 172 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 173 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 174 DefaultOperationQuota quota = 175 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 176 177 // use the whole limit 178 quota.checkBatchQuota(limit, 0, false); 179 180 // the next request should be rejected 181 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 182 183 envEdge.incValue(1000); 184 // after the TimeUnit, the limit should be refilled 185 quota.checkBatchQuota(limit, 0, false); 186 } 187 188 @Test 189 public void testTooLargeReadBatchIsNotBlocked() 190 throws RpcThrottlingException, InterruptedException { 191 int limit = 10; 192 QuotaProtos.Throttle throttle = 193 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 194 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 195 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 196 DefaultOperationQuota quota = 197 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 198 199 // use more than the limit, which should succeed rather than being indefinitely blocked 200 quota.checkBatchQuota(0, 10 + limit, false); 201 202 // the next request should be blocked 203 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 204 205 envEdge.incValue(1000); 206 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 207 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false)); 208 } 209 210 @Test 211 public void testTooLargeWriteBatchIsNotBlocked() 212 throws RpcThrottlingException, InterruptedException { 213 int limit = 10; 214 QuotaProtos.Throttle throttle = 215 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 216 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 217 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 218 DefaultOperationQuota quota = 219 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 220 221 // use more than the limit, which should succeed rather than being indefinitely blocked 222 quota.checkBatchQuota(10 + limit, 0, false); 223 224 // the next request should be blocked 225 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 226 227 envEdge.incValue(1000); 228 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 229 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); 230 } 231 232 @Test 233 public void testTooLargeWriteSizeIsNotBlocked() 234 throws RpcThrottlingException, InterruptedException { 235 int limit = 50; 236 QuotaProtos.Throttle throttle = 237 QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() 238 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 239 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 240 DefaultOperationQuota quota = 241 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 242 243 // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked 244 quota.checkBatchQuota(1, 0, false); 245 246 // the next request should be blocked 247 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 248 249 envEdge.incValue(1000); 250 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 251 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); 252 } 253 254 @Test 255 public void testTooLargeReadSizeIsNotBlocked() 256 throws RpcThrottlingException, InterruptedException { 257 long blockSize = 65536; 258 long limit = blockSize / 2; 259 QuotaProtos.Throttle throttle = 260 QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() 261 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 262 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 263 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, 264 DEFAULT_REQUESTS_PER_SECOND, limiter); 265 266 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 267 quota.checkBatchQuota(0, 1, false); 268 269 // the next request should be blocked 270 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 271 272 envEdge.incValue(1000); 273 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 274 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); 275 } 276 277 @Test 278 public void testTooLargeRequestSizeIsNotBlocked() 279 throws RpcThrottlingException, InterruptedException { 280 long blockSize = 65536; 281 long limit = blockSize / 2; 282 QuotaProtos.Throttle throttle = 283 QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() 284 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 285 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle); 286 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, 287 DEFAULT_REQUESTS_PER_SECOND, limiter); 288 289 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 290 quota.checkBatchQuota(0, 1, false); 291 292 // the next request should be blocked 293 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 294 295 envEdge.incValue(1000); 296 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 297 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); 298 } 299}