001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.HBaseTestingUtil; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.CheckAndMutate; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Increment; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.RowMutations; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.testclassification.RegionServerTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.junit.jupiter.api.AfterAll; 041import org.junit.jupiter.api.BeforeAll; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Tag(RegionServerTests.TAG) 048@Tag(MediumTests.TAG) 049public class TestAtomicReadQuota { 050 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class); 051 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 052 private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); 053 private static final byte[] FAMILY = Bytes.toBytes("cf"); 054 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 055 056 @AfterAll 057 public static void tearDown() throws Exception { 058 ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); 059 EnvironmentEdgeManager.reset(); 060 TEST_UTIL.deleteTable(TABLE_NAME); 061 TEST_UTIL.shutdownMiniCluster(); 062 } 063 064 @BeforeAll 065 public static void setUpBeforeClass() throws Exception { 066 TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 067 TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); 068 TEST_UTIL.startMiniCluster(1); 069 TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); 070 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 071 TEST_UTIL.waitTableAvailable(TABLE_NAME); 072 } 073 074 @Test 075 public void testIncrementCountedAgainstReadCapacity() throws Exception { 076 setupGenericQuota(); 077 078 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 079 inc.addColumn(FAMILY, QUALIFIER, 1); 080 testThrottle(table -> table.increment(inc)); 081 } 082 083 @Test 084 public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { 085 setupGenericQuota(); 086 087 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 088 Increment inc = new Increment(row); 089 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 090 Put put = new Put(row); 091 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 092 093 RowMutations rowMutations = new RowMutations(row); 094 rowMutations.add(inc); 095 rowMutations.add(put); 096 testThrottle(table -> table.mutateRow(rowMutations)); 097 } 098 099 @Test 100 public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { 101 setupGenericQuota(); 102 103 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 104 Put put = new Put(row); 105 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 106 107 RowMutations rowMutations = new RowMutations(row); 108 rowMutations.add(put); 109 try (Table table = getTable()) { 110 for (int i = 0; i < 100; i++) { 111 table.mutateRow(rowMutations); 112 } 113 } 114 } 115 116 @Test 117 public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { 118 setupGenericQuota(); 119 runNonAtomicPuts(); 120 } 121 122 @Test 123 public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { 124 setupGenericQuota(); 125 runNonAtomicPuts(); 126 } 127 128 @Test 129 public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { 130 setupGenericQuota(); 131 132 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 133 byte[] value = Bytes.toBytes("v"); 134 Put put = new Put(row); 135 put.addColumn(FAMILY, Bytes.toBytes("doot"), value); 136 CheckAndMutate checkAndMutate = 137 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put); 138 139 testThrottle(table -> table.checkAndMutate(checkAndMutate)); 140 } 141 142 @Test 143 public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { 144 setupGenericQuota(); 145 146 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 147 Increment inc = new Increment(row); 148 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 149 150 List<Increment> incs = new ArrayList<>(2); 151 incs.add(inc); 152 incs.add(inc); 153 154 testThrottle(table -> { 155 Object[] results = new Object[] {}; 156 table.batch(incs, results); 157 return results; 158 }); 159 } 160 161 @Test 162 public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception { 163 setupAtomicOnlyReqNumQuota(); 164 165 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 166 Increment inc = new Increment(row); 167 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 168 169 List<Increment> incs = new ArrayList<>(2); 170 incs.add(inc); 171 incs.add(inc); 172 173 testThrottle(table -> { 174 Object[] results = new Object[] {}; 175 table.batch(incs, results); 176 return results; 177 }); 178 } 179 180 @Test 181 public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception { 182 setupAtomicOnlyReadSizeQuota(); 183 184 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 185 Increment inc = new Increment(row); 186 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 187 188 List<Increment> incs = new ArrayList<>(2); 189 incs.add(inc); 190 incs.add(inc); 191 192 testThrottle(table -> { 193 Object[] results = new Object[] {}; 194 table.batch(incs, results); 195 return results; 196 }); 197 } 198 199 @Test 200 public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception { 201 setupAtomicOnlyReqNumQuota(); 202 runNonAtomicPuts(); 203 } 204 205 @Test 206 public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception { 207 setupAtomicOnlyReadSizeQuota(); 208 runNonAtomicPuts(); 209 } 210 211 @Test 212 public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception { 213 setupAtomicOnlyReqNumQuota(); 214 runNonAtomicReads(); 215 } 216 217 @Test 218 public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception { 219 setupAtomicOnlyReadSizeQuota(); 220 runNonAtomicReads(); 221 } 222 223 private void runNonAtomicPuts() throws Exception { 224 Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 225 put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 226 Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 227 put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 228 229 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 230 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 231 232 List<Put> puts = new ArrayList<>(2); 233 puts.add(put1); 234 puts.add(put2); 235 236 try (Table table = getTable()) { 237 for (int i = 0; i < 100; i++) { 238 table.put(puts); 239 } 240 } 241 } 242 243 private void runNonAtomicReads() throws Exception { 244 try (Table table = getTable()) { 245 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 246 Get get = new Get(row); 247 table.get(get); 248 } 249 } 250 251 private void setupGenericQuota() throws Exception { 252 try (Admin admin = TEST_UTIL.getAdmin()) { 253 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 254 ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); 255 } 256 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 257 } 258 259 private void setupAtomicOnlyReqNumQuota() throws Exception { 260 try (Admin admin = TEST_UTIL.getAdmin()) { 261 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 262 ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES)); 263 } 264 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 265 } 266 267 private void setupAtomicOnlyReadSizeQuota() throws Exception { 268 try (Admin admin = TEST_UTIL.getAdmin()) { 269 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 270 ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES)); 271 } 272 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 273 } 274 275 private void cleanupQuota() throws Exception { 276 try (Admin admin = TEST_UTIL.getAdmin()) { 277 admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); 278 } 279 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 280 } 281 282 private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception { 283 try (Table table = getTable()) { 284 // we have a read quota configured, so this should fail 285 TEST_UTIL.waitFor(60_000, () -> { 286 try { 287 request.run(table); 288 return false; 289 } catch (Exception e) { 290 boolean success = e.getCause() instanceof RpcThrottlingException; 291 if (!success) { 292 LOG.error("Unexpected exception", e); 293 } 294 return success; 295 } 296 }); 297 } finally { 298 cleanupQuota(); 299 } 300 } 301 302 private Table getTable() throws IOException { 303 TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); 304 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 305 return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) 306 .build(); 307 } 308 309 @FunctionalInterface 310 private interface ThrowingFunction<I, O> { 311 O run(I input) throws Exception; 312 } 313 314}