001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets;
021import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets;
022import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
023import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
024import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
025import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
026import static org.junit.Assert.assertTrue;
027
028import java.util.concurrent.Callable;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.junit.After;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050@Category({ RegionServerTests.class, MediumTests.class })
051public class TestBlockBytesScannedQuota {
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestBlockBytesScannedQuota.class);
055
056  private final static Logger LOG = LoggerFactory.getLogger(TestBlockBytesScannedQuota.class);
057
058  private static final int REFRESH_TIME = 5000;
059  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060  private static final byte[] FAMILY = Bytes.toBytes("cf");
061  private static final byte[] QUALIFIER = Bytes.toBytes("q");
062
063  private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest");
064  private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024;
065
066  @BeforeClass
067  public static void setUpBeforeClass() throws Exception {
068    // client should fail fast
069    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
071    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
072      MAX_SCANNER_RESULT_SIZE);
073    TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY,
074      AverageIntervalRateLimiter.class, RateLimiter.class);
075
076    // quotas enabled, using block bytes scanned
077    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
078    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
079
080    // don't cache blocks to make IO predictable
081    TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
082
083    TEST_UTIL.startMiniCluster(1);
084    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
085    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
086    TEST_UTIL.waitTableAvailable(TABLE_NAME);
087  }
088
089  @AfterClass
090  public static void tearDownAfterClass() throws Exception {
091    EnvironmentEdgeManager.reset();
092    TEST_UTIL.deleteTable(TABLE_NAME);
093    TEST_UTIL.shutdownMiniCluster();
094  }
095
096  @After
097  public void tearDown() throws Exception {
098    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
099  }
100
101  @Test
102  public void testBBSGet() throws Exception {
103    final Admin admin = TEST_UTIL.getAdmin();
104    final String userName = User.getCurrent().getShortName();
105    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
106    Table table = admin.getConnection().getTable(TABLE_NAME);
107
108    doPuts(10_000, FAMILY, QUALIFIER, table);
109    TEST_UTIL.flush(TABLE_NAME);
110
111    // Add ~10 block/sec limit
112    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
113      Math.round(10.1 * blockSize), TimeUnit.SECONDS));
114    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
115
116    // should execute at max 10 requests
117    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
118
119    // wait a minute and you should get another 10 requests executed
120    waitMinuteQuota();
121    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
122
123    // Remove all the limits
124    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
125    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
126    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
127    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
128  }
129
130  @Test
131  public void testBBSScan() throws Exception {
132    final Admin admin = TEST_UTIL.getAdmin();
133    final String userName = User.getCurrent().getShortName();
134    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
135    Table table = admin.getConnection().getTable(TABLE_NAME);
136
137    doPuts(10_000, FAMILY, QUALIFIER, table);
138    TEST_UTIL.flush(TABLE_NAME);
139
140    // Add 1 block/sec limit.
141    // This should only allow 1 scan per minute, because we estimate 1 block per scan
142    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
143      TimeUnit.SECONDS));
144    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
145    waitMinuteQuota();
146
147    // should execute 1 request
148    testTraffic(() -> doScans(5, table, 1), 1, 0);
149
150    // Remove all the limits
151    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
152    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
153    testTraffic(() -> doScans(100, table, 1), 100, 0);
154    testTraffic(() -> doScans(100, table, 1), 100, 0);
155
156    // Add ~3 block/sec limit. This should support >1 scans
157    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
158      Math.round(3.1 * blockSize), TimeUnit.SECONDS));
159    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
160    waitMinuteQuota();
161
162    // Add 50 block/sec limit. This should support >1 scans
163    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
164      Math.round(50.1 * blockSize), TimeUnit.SECONDS));
165    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
166    waitMinuteQuota();
167
168    // This will produce some throttling exceptions, but all/most should succeed within the timeout
169    testTraffic(() -> doScans(100, table, 1), 75, 25);
170    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
171    waitMinuteQuota();
172
173    // With large caching, a big scan should succeed
174    testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0);
175    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
176    waitMinuteQuota();
177
178    // Remove all the limits
179    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
180    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
181    testTraffic(() -> doScans(100, table, 1), 100, 0);
182    testTraffic(() -> doScans(100, table, 1), 100, 0);
183  }
184
185  @Test
186  public void testSmallScanNeverBlockedByLargeEstimate() throws Exception {
187    final Admin admin = TEST_UTIL.getAdmin();
188    final String userName = User.getCurrent().getShortName();
189    Table table = admin.getConnection().getTable(TABLE_NAME);
190
191    doPuts(10_000, FAMILY, QUALIFIER, table);
192    TEST_UTIL.flush(TABLE_NAME);
193
194    // Add 99MB/sec limit.
195    // This should never be blocked, but with a sequence number approaching 10k, without
196    // other intervention, we would estimate a scan workload approaching 625MB or the
197    // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all
198    // requests succeed, so the estimate never becomes large enough to cause read downtime
199    long limit = 99 * 1024 * 1024;
200    assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code
201                                                  // changes
202    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit,
203      TimeUnit.SECONDS));
204    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
205    waitMinuteQuota();
206
207    // should execute all requests
208    testTraffic(() -> doScans(10_000, table, 1), 10_000, 0);
209    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
210    waitMinuteQuota();
211
212    // Remove all the limits
213    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
214    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
215    testTraffic(() -> doScans(100, table, 1), 100, 0);
216    testTraffic(() -> doScans(100, table, 1), 100, 0);
217  }
218
219  @Test
220  public void testBBSMultiGet() throws Exception {
221    final Admin admin = TEST_UTIL.getAdmin();
222    final String userName = User.getCurrent().getShortName();
223    int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
224    Table table = admin.getConnection().getTable(TABLE_NAME);
225    int rowCount = 10_000;
226
227    doPuts(rowCount, FAMILY, QUALIFIER, table);
228    TEST_UTIL.flush(TABLE_NAME);
229
230    // Add 1 block/sec limit.
231    // This should only allow 1 multiget per minute, because we estimate 1 block per multiget
232    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
233      TimeUnit.SECONDS));
234    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
235    waitMinuteQuota();
236
237    // should execute 1 request
238    testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1);
239
240    // Remove all the limits
241    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
242    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
243    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
244    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
245
246    // Add ~100 block/sec limit
247    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
248      Math.round(100.1 * blockSize), TimeUnit.SECONDS));
249    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
250
251    // should execute approximately 10 batches of 10 requests
252    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
253
254    // wait a minute and you should get another ~10 batches of 10 requests
255    waitMinuteQuota();
256    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
257
258    // Remove all the limits
259    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
260    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
261    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
262    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
263  }
264
265  private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
266    throws Exception {
267    TEST_UTIL.waitFor(5_000, () -> {
268      long actualSuccess;
269      try {
270        actualSuccess = trafficCallable.call();
271      } catch (Exception e) {
272        throw new RuntimeException(e);
273      }
274      LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess,
275        expectedSuccess, marginOfError);
276      boolean success = (actualSuccess >= expectedSuccess - marginOfError)
277        && (actualSuccess <= expectedSuccess + marginOfError);
278      if (!success) {
279        triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
280        waitMinuteQuota();
281      }
282      return success;
283    });
284  }
285}