001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.testclassification.RegionServerTests;
028import org.apache.hadoop.hbase.testclassification.SmallTests;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
031import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
032import org.junit.ClassRule;
033import org.junit.Test;
034import org.junit.experimental.categories.Category;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
038
039@Category({ RegionServerTests.class, SmallTests.class })
040public class TestDefaultOperationQuota {
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043    HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
044
045  private static final Configuration conf = HBaseConfiguration.create();
046  private static final int DEFAULT_REQUESTS_PER_SECOND = 1000;
047  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
048  static {
049    envEdge.setValue(EnvironmentEdgeManager.currentTime());
050    // only active the envEdge for quotas package
051    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
052      ThrottleQuotaTestUtil.class.getPackage().getName());
053  }
054
055  @Test
056  public void testScanEstimateNewScanner() {
057    long blockSize = 64 * 1024;
058    long nextCallSeq = 0;
059    long maxScannerResultSize = 100 * 1024 * 1024;
060    long maxBlockBytesScanned = 0;
061    long prevBBSDifference = 0;
062    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
063      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
064
065    // new scanner should estimate scan read as 1 block
066    assertEquals(blockSize, estimate);
067  }
068
069  @Test
070  public void testScanEstimateSecondNextCall() {
071    long blockSize = 64 * 1024;
072    long nextCallSeq = 1;
073    long maxScannerResultSize = 100 * 1024 * 1024;
074    long maxBlockBytesScanned = 10 * blockSize;
075    long prevBBSDifference = 10 * blockSize;
076    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
077      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
078
079    // 2nd next call should be estimated at maxBBS
080    assertEquals(maxBlockBytesScanned, estimate);
081  }
082
083  @Test
084  public void testScanEstimateFlatWorkload() {
085    long blockSize = 64 * 1024;
086    long nextCallSeq = 100;
087    long maxScannerResultSize = 100 * 1024 * 1024;
088    long maxBlockBytesScanned = 10 * blockSize;
089    long prevBBSDifference = 0;
090    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
091      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
092
093    // flat workload should not overestimate
094    assertEquals(maxBlockBytesScanned, estimate);
095  }
096
097  @Test
098  public void testScanEstimateVariableFlatWorkload() {
099    long blockSize = 64 * 1024;
100    long nextCallSeq = 1;
101    long maxScannerResultSize = 100 * 1024 * 1024;
102    long maxBlockBytesScanned = 10 * blockSize;
103    long prevBBSDifference = 0;
104    for (int i = 0; i < 100; i++) {
105      long variation = Math.round(Math.random() * blockSize);
106      if (variation % 2 == 0) {
107        variation *= -1;
108      }
109      // despite +/- <1 block variation, we consider this workload flat
110      prevBBSDifference = variation;
111
112      long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i,
113        maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
114
115      // flat workload should not overestimate
116      assertEquals(maxBlockBytesScanned, estimate);
117    }
118  }
119
120  @Test
121  public void testScanEstimateGrowingWorkload() {
122    long blockSize = 64 * 1024;
123    long nextCallSeq = 100;
124    long maxScannerResultSize = 100 * 1024 * 1024;
125    long maxBlockBytesScanned = 20 * blockSize;
126    long prevBBSDifference = 10 * blockSize;
127    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
128      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
129
130    // growing workload should overestimate
131    assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate);
132  }
133
134  @Test
135  public void testScanEstimateShrinkingWorkload() {
136    long blockSize = 64 * 1024;
137    long nextCallSeq = 100;
138    long maxScannerResultSize = 100 * 1024 * 1024;
139    long maxBlockBytesScanned = 20 * blockSize;
140    long prevBBSDifference = -10 * blockSize;
141    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
142      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
143
144    // shrinking workload should only shrink estimate to maxBBS
145    assertEquals(maxBlockBytesScanned, estimate);
146  }
147
148  @Test
149  public void testLargeBatchSaturatesReadNumLimit()
150    throws RpcThrottlingException, InterruptedException {
151    int limit = 10;
152    QuotaProtos.Throttle throttle =
153      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
154        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
155    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
156    DefaultOperationQuota quota =
157      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
158
159    // use the whole limit
160    quota.checkBatchQuota(0, limit, false);
161
162    // the next request should be rejected
163    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
164
165    envEdge.incValue(1000);
166    // after the TimeUnit, the limit should be refilled
167    quota.checkBatchQuota(0, limit, false);
168  }
169
170  @Test
171  public void testLargeBatchSaturatesReadWriteLimit()
172    throws RpcThrottlingException, InterruptedException {
173    int limit = 10;
174    QuotaProtos.Throttle throttle =
175      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
176        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
177    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
178    DefaultOperationQuota quota =
179      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
180
181    // use the whole limit
182    quota.checkBatchQuota(limit, 0, false);
183
184    // the next request should be rejected
185    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
186
187    envEdge.incValue(1000);
188    // after the TimeUnit, the limit should be refilled
189    quota.checkBatchQuota(limit, 0, false);
190  }
191
192  @Test
193  public void testTooLargeReadBatchIsNotBlocked()
194    throws RpcThrottlingException, InterruptedException {
195    int limit = 10;
196    QuotaProtos.Throttle throttle =
197      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
198        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
199    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
200    DefaultOperationQuota quota =
201      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
202
203    // use more than the limit, which should succeed rather than being indefinitely blocked
204    quota.checkBatchQuota(0, 10 + limit, false);
205
206    // the next request should be blocked
207    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
208
209    envEdge.incValue(1000);
210    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
211    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false));
212  }
213
214  @Test
215  public void testTooLargeWriteBatchIsNotBlocked()
216    throws RpcThrottlingException, InterruptedException {
217    int limit = 10;
218    QuotaProtos.Throttle throttle =
219      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
220        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
221    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
222    DefaultOperationQuota quota =
223      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
224
225    // use more than the limit, which should succeed rather than being indefinitely blocked
226    quota.checkBatchQuota(10 + limit, 0, false);
227
228    // the next request should be blocked
229    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
230
231    envEdge.incValue(1000);
232    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
233    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false));
234  }
235
236  @Test
237  public void testTooLargeWriteSizeIsNotBlocked()
238    throws RpcThrottlingException, InterruptedException {
239    int limit = 50;
240    QuotaProtos.Throttle throttle =
241      QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
242        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
243    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
244    DefaultOperationQuota quota =
245      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
246
247    // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
248    quota.checkBatchQuota(1, 0, false);
249
250    // the next request should be blocked
251    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
252
253    envEdge.incValue(1000);
254    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
255    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false));
256  }
257
258  @Test
259  public void testTooLargeReadSizeIsNotBlocked()
260    throws RpcThrottlingException, InterruptedException {
261    long blockSize = 65536;
262    long limit = blockSize / 2;
263    QuotaProtos.Throttle throttle =
264      QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
265        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
266    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
267    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize,
268      DEFAULT_REQUESTS_PER_SECOND, limiter);
269
270    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
271    quota.checkBatchQuota(0, 1, false);
272
273    // the next request should be blocked
274    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
275
276    envEdge.incValue(1000);
277    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
278    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false));
279  }
280
281  @Test
282  public void testTooLargeRequestSizeIsNotBlocked()
283    throws RpcThrottlingException, InterruptedException {
284    long blockSize = 65536;
285    long limit = blockSize / 2;
286    QuotaProtos.Throttle throttle =
287      QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
288        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
289    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(conf, throttle);
290    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize,
291      DEFAULT_REQUESTS_PER_SECOND, limiter);
292
293    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
294    quota.checkBatchQuota(0, 1, false);
295
296    // the next request should be blocked
297    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
298
299    envEdge.incValue(1000);
300    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
301    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false));
302  }
303}