001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertThrows;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.testclassification.RegionServerTests;
027import org.apache.hadoop.hbase.testclassification.SmallTests;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
030import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
031import org.junit.jupiter.api.Tag;
032import org.junit.jupiter.api.Test;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
036
037@Tag(RegionServerTests.TAG)
038@Tag(SmallTests.TAG)
039public class TestDefaultOperationQuota {
040
041  private static final Configuration conf = HBaseConfiguration.create();
042  private static final int DEFAULT_REQUESTS_PER_SECOND = 1000;
043  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
044  static {
045    envEdge.setValue(EnvironmentEdgeManager.currentTime());
046    // only active the envEdge for quotas package
047    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
048      ThrottleQuotaTestUtil.class.getPackage().getName());
049  }
050
051  @Test
052  public void testScanEstimateNewScanner() {
053    long blockSize = 64 * 1024;
054    long nextCallSeq = 0;
055    long maxScannerResultSize = 100 * 1024 * 1024;
056    long maxBlockBytesScanned = 0;
057    long prevBBSDifference = 0;
058    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
059      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
060
061    // new scanner should estimate scan read as 1 block
062    assertEquals(blockSize, estimate);
063  }
064
065  @Test
066  public void testScanEstimateSecondNextCall() {
067    long blockSize = 64 * 1024;
068    long nextCallSeq = 1;
069    long maxScannerResultSize = 100 * 1024 * 1024;
070    long maxBlockBytesScanned = 10 * blockSize;
071    long prevBBSDifference = 10 * blockSize;
072    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
073      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
074
075    // 2nd next call should be estimated at maxBBS
076    assertEquals(maxBlockBytesScanned, estimate);
077  }
078
079  @Test
080  public void testScanEstimateFlatWorkload() {
081    long blockSize = 64 * 1024;
082    long nextCallSeq = 100;
083    long maxScannerResultSize = 100 * 1024 * 1024;
084    long maxBlockBytesScanned = 10 * blockSize;
085    long prevBBSDifference = 0;
086    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
087      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
088
089    // flat workload should not overestimate
090    assertEquals(maxBlockBytesScanned, estimate);
091  }
092
093  @Test
094  public void testScanEstimateVariableFlatWorkload() {
095    long blockSize = 64 * 1024;
096    long nextCallSeq = 1;
097    long maxScannerResultSize = 100 * 1024 * 1024;
098    long maxBlockBytesScanned = 10 * blockSize;
099    long prevBBSDifference = 0;
100    for (int i = 0; i < 100; i++) {
101      long variation = Math.round(Math.random() * blockSize);
102      if (variation % 2 == 0) {
103        variation *= -1;
104      }
105      // despite +/- <1 block variation, we consider this workload flat
106      prevBBSDifference = variation;
107
108      long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i,
109        maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
110
111      // flat workload should not overestimate
112      assertEquals(maxBlockBytesScanned, estimate);
113    }
114  }
115
116  @Test
117  public void testScanEstimateGrowingWorkload() {
118    long blockSize = 64 * 1024;
119    long nextCallSeq = 100;
120    long maxScannerResultSize = 100 * 1024 * 1024;
121    long maxBlockBytesScanned = 20 * blockSize;
122    long prevBBSDifference = 10 * blockSize;
123    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
124      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
125
126    // growing workload should overestimate
127    assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate);
128  }
129
130  @Test
131  public void testScanEstimateShrinkingWorkload() {
132    long blockSize = 64 * 1024;
133    long nextCallSeq = 100;
134    long maxScannerResultSize = 100 * 1024 * 1024;
135    long maxBlockBytesScanned = 20 * blockSize;
136    long prevBBSDifference = -10 * blockSize;
137    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
138      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
139
140    // shrinking workload should only shrink estimate to maxBBS
141    assertEquals(maxBlockBytesScanned, estimate);
142  }
143
144  @Test
145  public void testLargeBatchSaturatesReadNumLimit()
146    throws RpcThrottlingException, InterruptedException {
147    int limit = 10;
148    QuotaProtos.Throttle throttle =
149      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
150        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
151    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
152    DefaultOperationQuota quota =
153      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
154
155    // use the whole limit
156    quota.checkBatchQuota(0, limit, false);
157
158    // the next request should be rejected
159    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
160
161    envEdge.incValue(1000);
162    // after the TimeUnit, the limit should be refilled
163    quota.checkBatchQuota(0, limit, false);
164  }
165
166  @Test
167  public void testLargeBatchSaturatesReadWriteLimit()
168    throws RpcThrottlingException, InterruptedException {
169    int limit = 10;
170    QuotaProtos.Throttle throttle =
171      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
172        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
173    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
174    DefaultOperationQuota quota =
175      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
176
177    // use the whole limit
178    quota.checkBatchQuota(limit, 0, false);
179
180    // the next request should be rejected
181    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
182
183    envEdge.incValue(1000);
184    // after the TimeUnit, the limit should be refilled
185    quota.checkBatchQuota(limit, 0, false);
186  }
187
188  @Test
189  public void testTooLargeReadBatchIsNotBlocked()
190    throws RpcThrottlingException, InterruptedException {
191    int limit = 10;
192    QuotaProtos.Throttle throttle =
193      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
194        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
195    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
196    DefaultOperationQuota quota =
197      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
198
199    // use more than the limit, which should succeed rather than being indefinitely blocked
200    quota.checkBatchQuota(0, 10 + limit, false);
201
202    // the next request should be blocked
203    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
204
205    envEdge.incValue(1000);
206    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
207    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false));
208  }
209
210  @Test
211  public void testTooLargeWriteBatchIsNotBlocked()
212    throws RpcThrottlingException, InterruptedException {
213    int limit = 10;
214    QuotaProtos.Throttle throttle =
215      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
216        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
217    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
218    DefaultOperationQuota quota =
219      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
220
221    // use more than the limit, which should succeed rather than being indefinitely blocked
222    quota.checkBatchQuota(10 + limit, 0, false);
223
224    // the next request should be blocked
225    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
226
227    envEdge.incValue(1000);
228    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
229    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false));
230  }
231
232  @Test
233  public void testTooLargeWriteSizeIsNotBlocked()
234    throws RpcThrottlingException, InterruptedException {
235    int limit = 50;
236    QuotaProtos.Throttle throttle =
237      QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
238        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
239    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
240    DefaultOperationQuota quota =
241      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
242
243    // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
244    quota.checkBatchQuota(1, 0, false);
245
246    // the next request should be blocked
247    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
248
249    envEdge.incValue(1000);
250    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
251    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false));
252  }
253
254  @Test
255  public void testTooLargeReadSizeIsNotBlocked()
256    throws RpcThrottlingException, InterruptedException {
257    long blockSize = 65536;
258    long limit = blockSize / 2;
259    QuotaProtos.Throttle throttle =
260      QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
261        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
262    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
263    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize,
264      DEFAULT_REQUESTS_PER_SECOND, limiter);
265
266    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
267    quota.checkBatchQuota(0, 1, false);
268
269    // the next request should be blocked
270    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
271
272    envEdge.incValue(1000);
273    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
274    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false));
275  }
276
277  @Test
278  public void testTooLargeRequestSizeIsNotBlocked()
279    throws RpcThrottlingException, InterruptedException {
280    long blockSize = 65536;
281    long limit = blockSize / 2;
282    QuotaProtos.Throttle throttle =
283      QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
284        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
285    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
286    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize,
287      DEFAULT_REQUESTS_PER_SECOND, limiter);
288
289    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
290    quota.checkBatchQuota(0, 1, false);
291
292    // the next request should be blocked
293    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
294
295    envEdge.incValue(1000);
296    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
297    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false));
298  }
299}