001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.CheckAndMutate;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Increment;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.RowMutations;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ RegionServerTests.class, MediumTests.class })
050public class TestAtomicReadQuota {
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
054  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
055  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056  private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
057  private static final byte[] FAMILY = Bytes.toBytes("cf");
058  private static final byte[] QUALIFIER = Bytes.toBytes("q");
059
060  @AfterClass
061  public static void tearDown() throws Exception {
062    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
063    EnvironmentEdgeManager.reset();
064    TEST_UTIL.deleteTable(TABLE_NAME);
065    TEST_UTIL.shutdownMiniCluster();
066  }
067
068  @BeforeClass
069  public static void setUpBeforeClass() throws Exception {
070    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
071    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
072    TEST_UTIL.startMiniCluster(1);
073    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
074    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
075    TEST_UTIL.waitTableAvailable(TABLE_NAME);
076  }
077
078  @Test
079  public void testIncrementCountedAgainstReadCapacity() throws Exception {
080    setupGenericQuota();
081
082    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
083    inc.addColumn(FAMILY, QUALIFIER, 1);
084    testThrottle(table -> table.increment(inc));
085  }
086
087  @Test
088  public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
089    setupGenericQuota();
090
091    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
092    Increment inc = new Increment(row);
093    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
094    Put put = new Put(row);
095    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
096
097    RowMutations rowMutations = new RowMutations(row);
098    rowMutations.add(inc);
099    rowMutations.add(put);
100    testThrottle(table -> table.mutateRow(rowMutations));
101  }
102
103  @Test
104  public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
105    setupGenericQuota();
106
107    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
108    Put put = new Put(row);
109    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
110
111    RowMutations rowMutations = new RowMutations(row);
112    rowMutations.add(put);
113    try (Table table = getTable()) {
114      for (int i = 0; i < 100; i++) {
115        table.mutateRow(rowMutations);
116      }
117    }
118  }
119
120  @Test
121  public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
122    setupGenericQuota();
123    runNonAtomicPuts();
124  }
125
126  @Test
127  public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
128    setupGenericQuota();
129    runNonAtomicPuts();
130  }
131
132  @Test
133  public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
134    setupGenericQuota();
135
136    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
137    byte[] value = Bytes.toBytes("v");
138    Put put = new Put(row);
139    put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
140    CheckAndMutate checkAndMutate =
141      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);
142
143    testThrottle(table -> table.checkAndMutate(checkAndMutate));
144  }
145
146  @Test
147  public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
148    setupGenericQuota();
149
150    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
151    Increment inc = new Increment(row);
152    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
153
154    List<Increment> incs = new ArrayList<>(2);
155    incs.add(inc);
156    incs.add(inc);
157
158    testThrottle(table -> {
159      Object[] results = new Object[] {};
160      table.batch(incs, results);
161      return results;
162    });
163  }
164
165  @Test
166  public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception {
167    setupAtomicOnlyReqNumQuota();
168
169    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
170    Increment inc = new Increment(row);
171    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
172
173    List<Increment> incs = new ArrayList<>(2);
174    incs.add(inc);
175    incs.add(inc);
176
177    testThrottle(table -> {
178      Object[] results = new Object[] {};
179      table.batch(incs, results);
180      return results;
181    });
182  }
183
184  @Test
185  public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception {
186    setupAtomicOnlyReadSizeQuota();
187
188    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
189    Increment inc = new Increment(row);
190    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
191
192    List<Increment> incs = new ArrayList<>(2);
193    incs.add(inc);
194    incs.add(inc);
195
196    testThrottle(table -> {
197      Object[] results = new Object[] {};
198      table.batch(incs, results);
199      return results;
200    });
201  }
202
203  @Test
204  public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception {
205    setupAtomicOnlyReqNumQuota();
206    runNonAtomicPuts();
207  }
208
209  @Test
210  public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception {
211    setupAtomicOnlyReadSizeQuota();
212    runNonAtomicPuts();
213  }
214
215  @Test
216  public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception {
217    setupAtomicOnlyReqNumQuota();
218    runNonAtomicReads();
219  }
220
221  @Test
222  public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception {
223    setupAtomicOnlyReadSizeQuota();
224    runNonAtomicReads();
225  }
226
227  private void runNonAtomicPuts() throws Exception {
228    Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
229    put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
230    Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
231    put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
232
233    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
234    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
235
236    List<Put> puts = new ArrayList<>(2);
237    puts.add(put1);
238    puts.add(put2);
239
240    try (Table table = getTable()) {
241      for (int i = 0; i < 100; i++) {
242        table.put(puts);
243      }
244    }
245  }
246
247  private void runNonAtomicReads() throws Exception {
248    try (Table table = getTable()) {
249      byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
250      Get get = new Get(row);
251      table.get(get);
252    }
253  }
254
255  private void setupGenericQuota() throws Exception {
256    try (Admin admin = TEST_UTIL.getAdmin()) {
257      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
258        ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
259    }
260    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
261  }
262
263  private void setupAtomicOnlyReqNumQuota() throws Exception {
264    try (Admin admin = TEST_UTIL.getAdmin()) {
265      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
266        ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES));
267    }
268    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
269  }
270
271  private void setupAtomicOnlyReadSizeQuota() throws Exception {
272    try (Admin admin = TEST_UTIL.getAdmin()) {
273      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
274        ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES));
275    }
276    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
277  }
278
279  private void cleanupQuota() throws Exception {
280    try (Admin admin = TEST_UTIL.getAdmin()) {
281      admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
282    }
283    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
284  }
285
286  private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
287    try (Table table = getTable()) {
288      // we have a read quota configured, so this should fail
289      TEST_UTIL.waitFor(60_000, () -> {
290        try {
291          request.run(table);
292          return false;
293        } catch (Exception e) {
294          boolean success = e.getCause() instanceof RpcThrottlingException;
295          if (!success) {
296            LOG.error("Unexpected exception", e);
297          }
298          return success;
299        }
300      });
301    } finally {
302      cleanupQuota();
303    }
304  }
305
306  private Table getTable() throws IOException {
307    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
308    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
309    return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
310      .build();
311  }
312
313  @FunctionalInterface
314  private interface ThrowingFunction<I, O> {
315    O run(I input) throws Exception;
316  }
317
318}