001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.CheckAndMutate;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Increment;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.RowMutations;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.BeforeAll;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Tag(RegionServerTests.TAG)
048@Tag(MediumTests.TAG)
049public class TestAtomicReadQuota {
050  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
051  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
052  private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
053  private static final byte[] FAMILY = Bytes.toBytes("cf");
054  private static final byte[] QUALIFIER = Bytes.toBytes("q");
055
056  @AfterAll
057  public static void tearDown() throws Exception {
058    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
059    EnvironmentEdgeManager.reset();
060    TEST_UTIL.deleteTable(TABLE_NAME);
061    TEST_UTIL.shutdownMiniCluster();
062  }
063
064  @BeforeAll
065  public static void setUpBeforeClass() throws Exception {
066    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
067    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
068    TEST_UTIL.startMiniCluster(1);
069    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
070    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
071    TEST_UTIL.waitTableAvailable(TABLE_NAME);
072  }
073
074  @Test
075  public void testIncrementCountedAgainstReadCapacity() throws Exception {
076    setupGenericQuota();
077
078    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
079    inc.addColumn(FAMILY, QUALIFIER, 1);
080    testThrottle(table -> table.increment(inc));
081  }
082
083  @Test
084  public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
085    setupGenericQuota();
086
087    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
088    Increment inc = new Increment(row);
089    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
090    Put put = new Put(row);
091    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
092
093    RowMutations rowMutations = new RowMutations(row);
094    rowMutations.add(inc);
095    rowMutations.add(put);
096    testThrottle(table -> table.mutateRow(rowMutations));
097  }
098
099  @Test
100  public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
101    setupGenericQuota();
102
103    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
104    Put put = new Put(row);
105    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
106
107    RowMutations rowMutations = new RowMutations(row);
108    rowMutations.add(put);
109    try (Table table = getTable()) {
110      for (int i = 0; i < 100; i++) {
111        table.mutateRow(rowMutations);
112      }
113    }
114  }
115
116  @Test
117  public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
118    setupGenericQuota();
119    runNonAtomicPuts();
120  }
121
122  @Test
123  public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
124    setupGenericQuota();
125    runNonAtomicPuts();
126  }
127
128  @Test
129  public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
130    setupGenericQuota();
131
132    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
133    byte[] value = Bytes.toBytes("v");
134    Put put = new Put(row);
135    put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
136    CheckAndMutate checkAndMutate =
137      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);
138
139    testThrottle(table -> table.checkAndMutate(checkAndMutate));
140  }
141
142  @Test
143  public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
144    setupGenericQuota();
145
146    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
147    Increment inc = new Increment(row);
148    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
149
150    List<Increment> incs = new ArrayList<>(2);
151    incs.add(inc);
152    incs.add(inc);
153
154    testThrottle(table -> {
155      Object[] results = new Object[] {};
156      table.batch(incs, results);
157      return results;
158    });
159  }
160
161  @Test
162  public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception {
163    setupAtomicOnlyReqNumQuota();
164
165    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
166    Increment inc = new Increment(row);
167    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
168
169    List<Increment> incs = new ArrayList<>(2);
170    incs.add(inc);
171    incs.add(inc);
172
173    testThrottle(table -> {
174      Object[] results = new Object[] {};
175      table.batch(incs, results);
176      return results;
177    });
178  }
179
180  @Test
181  public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception {
182    setupAtomicOnlyReadSizeQuota();
183
184    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
185    Increment inc = new Increment(row);
186    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
187
188    List<Increment> incs = new ArrayList<>(2);
189    incs.add(inc);
190    incs.add(inc);
191
192    testThrottle(table -> {
193      Object[] results = new Object[] {};
194      table.batch(incs, results);
195      return results;
196    });
197  }
198
199  @Test
200  public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception {
201    setupAtomicOnlyReqNumQuota();
202    runNonAtomicPuts();
203  }
204
205  @Test
206  public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception {
207    setupAtomicOnlyReadSizeQuota();
208    runNonAtomicPuts();
209  }
210
211  @Test
212  public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception {
213    setupAtomicOnlyReqNumQuota();
214    runNonAtomicReads();
215  }
216
217  @Test
218  public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception {
219    setupAtomicOnlyReadSizeQuota();
220    runNonAtomicReads();
221  }
222
223  private void runNonAtomicPuts() throws Exception {
224    Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
225    put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
226    Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
227    put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
228
229    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
230    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
231
232    List<Put> puts = new ArrayList<>(2);
233    puts.add(put1);
234    puts.add(put2);
235
236    try (Table table = getTable()) {
237      for (int i = 0; i < 100; i++) {
238        table.put(puts);
239      }
240    }
241  }
242
243  private void runNonAtomicReads() throws Exception {
244    try (Table table = getTable()) {
245      byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
246      Get get = new Get(row);
247      table.get(get);
248    }
249  }
250
251  private void setupGenericQuota() throws Exception {
252    try (Admin admin = TEST_UTIL.getAdmin()) {
253      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
254        ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
255    }
256    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
257  }
258
259  private void setupAtomicOnlyReqNumQuota() throws Exception {
260    try (Admin admin = TEST_UTIL.getAdmin()) {
261      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
262        ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES));
263    }
264    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
265  }
266
267  private void setupAtomicOnlyReadSizeQuota() throws Exception {
268    try (Admin admin = TEST_UTIL.getAdmin()) {
269      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
270        ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES));
271    }
272    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
273  }
274
275  private void cleanupQuota() throws Exception {
276    try (Admin admin = TEST_UTIL.getAdmin()) {
277      admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
278    }
279    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
280  }
281
282  private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
283    try (Table table = getTable()) {
284      // we have a read quota configured, so this should fail
285      TEST_UTIL.waitFor(60_000, () -> {
286        try {
287          request.run(table);
288          return false;
289        } catch (Exception e) {
290          boolean success = e.getCause() instanceof RpcThrottlingException;
291          if (!success) {
292            LOG.error("Unexpected exception", e);
293          }
294          return success;
295        }
296      });
297    } finally {
298      cleanupQuota();
299    }
300  }
301
302  private Table getTable() throws IOException {
303    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
304    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
305    return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
306      .build();
307  }
308
309  @FunctionalInterface
310  private interface ThrowingFunction<I, O> {
311    O run(I input) throws Exception;
312  }
313
314}