001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets;
021import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets;
022import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
023import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
024import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
025import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
026import static org.junit.Assert.assertTrue;
027
028import java.util.concurrent.Callable;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.junit.After;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050@Category({ RegionServerTests.class, MediumTests.class })
051public class TestBlockBytesScannedQuota {
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestBlockBytesScannedQuota.class);
055
056  private final static Logger LOG = LoggerFactory.getLogger(TestBlockBytesScannedQuota.class);
057
058  private static final int REFRESH_TIME = 5000;
059  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060  private static final byte[] FAMILY = Bytes.toBytes("cf");
061  private static final byte[] QUALIFIER = Bytes.toBytes("q");
062
063  private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest");
064  private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024;
065
066  @BeforeClass
067  public static void setUpBeforeClass() throws Exception {
068    // client should fail fast
069    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
071    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
072      MAX_SCANNER_RESULT_SIZE);
073    TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY,
074      AverageIntervalRateLimiter.class, RateLimiter.class);
075
076    // quotas enabled, using block bytes scanned
077    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
078    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
079
080    // don't cache blocks to make IO predictable
081    TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
082
083    TEST_UTIL.startMiniCluster(1);
084    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
085    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
086    TEST_UTIL.waitTableAvailable(TABLE_NAME);
087    QuotaCache.TEST_FORCE_REFRESH = true;
088  }
089
090  @AfterClass
091  public static void tearDownAfterClass() throws Exception {
092    EnvironmentEdgeManager.reset();
093    TEST_UTIL.deleteTable(TABLE_NAME);
094    TEST_UTIL.shutdownMiniCluster();
095  }
096
097  @After
098  public void tearDown() throws Exception {
099    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
100  }
101
102  @Test
103  public void testBBSGet() throws Exception {
104    final Admin admin = TEST_UTIL.getAdmin();
105    final String userName = User.getCurrent().getShortName();
106    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
107    Table table = admin.getConnection().getTable(TABLE_NAME);
108
109    doPuts(10_000, FAMILY, QUALIFIER, table);
110    TEST_UTIL.flush(TABLE_NAME);
111
112    // Add ~10 block/sec limit
113    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
114      Math.round(10.1 * blockSize), TimeUnit.SECONDS));
115    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
116
117    // should execute at max 10 requests
118    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
119
120    // wait a minute and you should get another 10 requests executed
121    waitMinuteQuota();
122    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
123
124    // Remove all the limits
125    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
126    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
127    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
128    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
129  }
130
131  @Test
132  public void testBBSScan() throws Exception {
133    final Admin admin = TEST_UTIL.getAdmin();
134    final String userName = User.getCurrent().getShortName();
135    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
136    Table table = admin.getConnection().getTable(TABLE_NAME);
137
138    doPuts(10_000, FAMILY, QUALIFIER, table);
139    TEST_UTIL.flush(TABLE_NAME);
140
141    // Add 1 block/sec limit.
142    // This should only allow 1 scan per minute, because we estimate 1 block per scan
143    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
144      TimeUnit.SECONDS));
145    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
146    waitMinuteQuota();
147
148    // should execute 1 request
149    testTraffic(() -> doScans(5, table, 1), 1, 0);
150
151    // Remove all the limits
152    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
153    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
154    testTraffic(() -> doScans(100, table, 1), 100, 0);
155    testTraffic(() -> doScans(100, table, 1), 100, 0);
156
157    // Add ~3 block/sec limit. This should support >1 scans
158    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
159      Math.round(3.1 * blockSize), TimeUnit.SECONDS));
160    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
161    waitMinuteQuota();
162
163    // Add 50 block/sec limit. This should support >1 scans
164    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
165      Math.round(50.1 * blockSize), TimeUnit.SECONDS));
166    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
167    waitMinuteQuota();
168
169    // This will produce some throttling exceptions, but all/most should succeed within the timeout
170    testTraffic(() -> doScans(100, table, 1), 75, 25);
171    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
172    waitMinuteQuota();
173
174    // With large caching, a big scan should succeed
175    testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0);
176    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
177    waitMinuteQuota();
178
179    // Remove all the limits
180    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
181    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
182    testTraffic(() -> doScans(100, table, 1), 100, 0);
183    testTraffic(() -> doScans(100, table, 1), 100, 0);
184  }
185
186  @Test
187  public void testSmallScanNeverBlockedByLargeEstimate() throws Exception {
188    final Admin admin = TEST_UTIL.getAdmin();
189    final String userName = User.getCurrent().getShortName();
190    Table table = admin.getConnection().getTable(TABLE_NAME);
191
192    doPuts(10_000, FAMILY, QUALIFIER, table);
193    TEST_UTIL.flush(TABLE_NAME);
194
195    // Add 99MB/sec limit.
196    // This should never be blocked, but with a sequence number approaching 10k, without
197    // other intervention, we would estimate a scan workload approaching 625MB or the
198    // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all
199    // requests succeed, so the estimate never becomes large enough to cause read downtime
200    long limit = 99 * 1024 * 1024;
201    assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code
202                                                  // changes
203    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit,
204      TimeUnit.SECONDS));
205    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
206    waitMinuteQuota();
207
208    // should execute all requests
209    testTraffic(() -> doScans(10_000, table, 1), 10_000, 0);
210    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
211    waitMinuteQuota();
212
213    // Remove all the limits
214    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
215    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
216    testTraffic(() -> doScans(100, table, 1), 100, 0);
217    testTraffic(() -> doScans(100, table, 1), 100, 0);
218  }
219
220  @Test
221  public void testBBSMultiGet() throws Exception {
222    final Admin admin = TEST_UTIL.getAdmin();
223    final String userName = User.getCurrent().getShortName();
224    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
225    Table table = admin.getConnection().getTable(TABLE_NAME);
226    int rowCount = 10_000;
227
228    doPuts(rowCount, FAMILY, QUALIFIER, table);
229    TEST_UTIL.flush(TABLE_NAME);
230
231    // Add 1 block/sec limit.
232    // This should only allow 1 multiget per minute, because we estimate 1 block per multiget
233    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
234      TimeUnit.SECONDS));
235    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
236    waitMinuteQuota();
237
238    // should execute 1 request
239    testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1);
240
241    // Remove all the limits
242    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
243    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
244    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
245    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
246
247    // Add ~100 block/sec limit
248    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
249      Math.round(100.1 * blockSize), TimeUnit.SECONDS));
250    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
251
252    // should execute approximately 10 batches of 10 requests
253    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
254
255    // wait a minute and you should get another ~10 batches of 10 requests
256    waitMinuteQuota();
257    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
258
259    // Remove all the limits
260    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
261    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
262    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
263    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
264  }
265
266  private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
267    throws Exception {
268    TEST_UTIL.waitFor(5_000, () -> {
269      long actualSuccess;
270      try {
271        actualSuccess = trafficCallable.call();
272      } catch (Exception e) {
273        throw new RuntimeException(e);
274      }
275      LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess,
276        expectedSuccess, marginOfError);
277      boolean success = (actualSuccess >= expectedSuccess - marginOfError)
278        && (actualSuccess <= expectedSuccess + marginOfError);
279      if (!success) {
280        triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
281        waitMinuteQuota();
282      }
283      return success;
284    });
285  }
286}