001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets;
021import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets;
022import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
023import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
024import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
025import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.util.concurrent.Callable;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.AfterEach;
042import org.junit.jupiter.api.BeforeAll;
043import org.junit.jupiter.api.Tag;
044import org.junit.jupiter.api.Test;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048@Tag(RegionServerTests.TAG)
049@Tag(LargeTests.TAG)
050public class TestBlockBytesScannedQuota {
051
052  private final static Logger LOG = LoggerFactory.getLogger(TestBlockBytesScannedQuota.class);
053
054  private static final int REFRESH_TIME = 5000;
055  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056  private static final byte[] FAMILY = Bytes.toBytes("cf");
057  private static final byte[] QUALIFIER = Bytes.toBytes("q");
058
059  private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest");
060  private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024;
061
062  @BeforeAll
063  public static void setUpBeforeClass() throws Exception {
064    // client should fail fast
065    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
066    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
067    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
068      MAX_SCANNER_RESULT_SIZE);
069    TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY,
070      AverageIntervalRateLimiter.class, RateLimiter.class);
071
072    // quotas enabled, using block bytes scanned
073    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
074    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
075
076    // don't cache blocks to make IO predictable
077    TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
078
079    TEST_UTIL.startMiniCluster(1);
080    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
081    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
082    TEST_UTIL.waitTableAvailable(TABLE_NAME);
083  }
084
085  @AfterAll
086  public static void tearDownAfterClass() throws Exception {
087    EnvironmentEdgeManager.reset();
088    TEST_UTIL.deleteTable(TABLE_NAME);
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  @AfterEach
093  public void tearDown() throws Exception {
094    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
095  }
096
097  @Test
098  public void testBBSGet() throws Exception {
099    final Admin admin = TEST_UTIL.getAdmin();
100    final String userName = User.getCurrent().getShortName();
101    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
102    Table table = admin.getConnection().getTable(TABLE_NAME);
103
104    doPuts(10_000, FAMILY, QUALIFIER, table);
105    TEST_UTIL.flush(TABLE_NAME);
106
107    // Add ~10 block/sec limit
108    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
109      Math.round(10.1 * blockSize), TimeUnit.SECONDS));
110    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
111
112    // should execute at max 10 requests
113    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
114
115    // wait a minute and you should get another 10 requests executed
116    waitMinuteQuota();
117    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
118
119    // Remove all the limits
120    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
121    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
122    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
123    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
124  }
125
126  @Test
127  public void testBBSScan() throws Exception {
128    final Admin admin = TEST_UTIL.getAdmin();
129    final String userName = User.getCurrent().getShortName();
130    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
131    Table table = admin.getConnection().getTable(TABLE_NAME);
132
133    doPuts(10_000, FAMILY, QUALIFIER, table);
134    TEST_UTIL.flush(TABLE_NAME);
135
136    // Add 1 block/sec limit.
137    // This should only allow 1 scan per minute, because we estimate 1 block per scan
138    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
139      TimeUnit.SECONDS));
140    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
141    waitMinuteQuota();
142
143    // should execute 1 request
144    testTraffic(() -> doScans(5, table, 1), 1, 0);
145
146    // Remove all the limits
147    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
148    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
149    testTraffic(() -> doScans(100, table, 1), 100, 0);
150    testTraffic(() -> doScans(100, table, 1), 100, 0);
151
152    // Add ~3 block/sec limit. This should support >1 scans
153    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
154      Math.round(3.1 * blockSize), TimeUnit.SECONDS));
155    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
156    waitMinuteQuota();
157
158    // Add 50 block/sec limit. This should support >1 scans
159    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
160      Math.round(50.1 * blockSize), TimeUnit.SECONDS));
161    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
162    waitMinuteQuota();
163
164    // This will produce some throttling exceptions, but all/most should succeed within the timeout
165    testTraffic(() -> doScans(100, table, 1), 75, 25);
166    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
167    waitMinuteQuota();
168
169    // With large caching, a big scan should succeed
170    testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0);
171    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
172    waitMinuteQuota();
173
174    // Remove all the limits
175    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
176    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
177    testTraffic(() -> doScans(100, table, 1), 100, 0);
178    testTraffic(() -> doScans(100, table, 1), 100, 0);
179  }
180
181  @Test
182  public void testSmallScanNeverBlockedByLargeEstimate() throws Exception {
183    final Admin admin = TEST_UTIL.getAdmin();
184    final String userName = User.getCurrent().getShortName();
185    Table table = admin.getConnection().getTable(TABLE_NAME);
186
187    doPuts(10_000, FAMILY, QUALIFIER, table);
188    TEST_UTIL.flush(TABLE_NAME);
189
190    // Add 99MB/sec limit.
191    // This should never be blocked, but with a sequence number approaching 10k, without
192    // other intervention, we would estimate a scan workload approaching 625MB or the
193    // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all
194    // requests succeed, so the estimate never becomes large enough to cause read downtime
195    long limit = 99 * 1024 * 1024;
196    assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code
197                                                  // changes
198    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit,
199      TimeUnit.SECONDS));
200    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
201    waitMinuteQuota();
202
203    // should execute all requests
204    testTraffic(() -> doScans(10_000, table, 1), 10_000, 0);
205    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
206    waitMinuteQuota();
207
208    // Remove all the limits
209    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
210    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
211    testTraffic(() -> doScans(100, table, 1), 100, 0);
212    testTraffic(() -> doScans(100, table, 1), 100, 0);
213  }
214
215  @Test
216  public void testBBSMultiGet() throws Exception {
217    final Admin admin = TEST_UTIL.getAdmin();
218    final String userName = User.getCurrent().getShortName();
219    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
220    Table table = admin.getConnection().getTable(TABLE_NAME);
221    int rowCount = 10_000;
222
223    doPuts(rowCount, FAMILY, QUALIFIER, table);
224    TEST_UTIL.flush(TABLE_NAME);
225
226    // Add 1 block/sec limit.
227    // This should only allow 1 multiget per minute, because we estimate 1 block per multiget
228    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
229      TimeUnit.SECONDS));
230    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
231    waitMinuteQuota();
232
233    // should execute 1 request
234    testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1);
235
236    // Remove all the limits
237    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
238    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
239    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
240    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
241
242    // Add ~100 block/sec limit
243    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
244      Math.round(100.1 * blockSize), TimeUnit.SECONDS));
245    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
246
247    // should execute approximately 10 batches of 10 requests
248    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
249
250    // wait a minute and you should get another ~10 batches of 10 requests
251    waitMinuteQuota();
252    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
253
254    // Remove all the limits
255    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
256    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
257    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
258    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
259  }
260
261  private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
262    throws Exception {
263    TEST_UTIL.waitFor(5_000, () -> {
264      long actualSuccess;
265      try {
266        actualSuccess = trafficCallable.call();
267      } catch (Exception e) {
268        throw new RuntimeException(e);
269      }
270      LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess,
271        expectedSuccess, marginOfError);
272      boolean success = (actualSuccess >= expectedSuccess - marginOfError)
273        && (actualSuccess <= expectedSuccess + marginOfError);
274      if (!success) {
275        triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
276        waitMinuteQuota();
277      }
278      return success;
279    });
280  }
281}