001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.CheckAndMutate; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.Increment; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.RowMutations; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ RegionServerTests.class, MediumTests.class }) 050public class TestAtomicReadQuota { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestAtomicReadQuota.class); 054 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class); 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); 057 private static final byte[] FAMILY = Bytes.toBytes("cf"); 058 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 059 060 @AfterClass 061 public static void tearDown() throws Exception { 062 ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); 063 EnvironmentEdgeManager.reset(); 064 TEST_UTIL.deleteTable(TABLE_NAME); 065 TEST_UTIL.shutdownMiniCluster(); 066 } 067 068 @BeforeClass 069 public static void setUpBeforeClass() throws Exception { 070 TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 071 TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); 072 TEST_UTIL.startMiniCluster(1); 073 TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); 074 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 075 TEST_UTIL.waitTableAvailable(TABLE_NAME); 076 } 077 078 @Test 079 public void testIncrementCountedAgainstReadCapacity() throws Exception { 080 setupGenericQuota(); 081 082 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 083 inc.addColumn(FAMILY, QUALIFIER, 1); 084 testThrottle(table -> table.increment(inc)); 085 } 086 087 @Test 088 public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { 089 setupGenericQuota(); 090 091 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 092 Increment inc = new Increment(row); 093 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 094 Put put = new Put(row); 095 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 096 097 RowMutations rowMutations = new RowMutations(row); 098 rowMutations.add(inc); 099 rowMutations.add(put); 100 testThrottle(table -> table.mutateRow(rowMutations)); 101 } 102 103 @Test 104 public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { 105 setupGenericQuota(); 106 107 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 108 Put put = new Put(row); 109 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 110 111 RowMutations rowMutations = new RowMutations(row); 112 rowMutations.add(put); 113 try (Table table = getTable()) { 114 for (int i = 0; i < 100; i++) { 115 table.mutateRow(rowMutations); 116 } 117 } 118 } 119 120 @Test 121 public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { 122 setupGenericQuota(); 123 runNonAtomicPuts(); 124 } 125 126 @Test 127 public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { 128 setupGenericQuota(); 129 runNonAtomicPuts(); 130 } 131 132 @Test 133 public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { 134 setupGenericQuota(); 135 136 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 137 byte[] value = Bytes.toBytes("v"); 138 Put put = new Put(row); 139 put.addColumn(FAMILY, Bytes.toBytes("doot"), value); 140 CheckAndMutate checkAndMutate = 141 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put); 142 143 testThrottle(table -> table.checkAndMutate(checkAndMutate)); 144 } 145 146 @Test 147 public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { 148 setupGenericQuota(); 149 150 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 151 Increment inc = new Increment(row); 152 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 153 154 List<Increment> incs = new ArrayList<>(2); 155 incs.add(inc); 156 incs.add(inc); 157 158 testThrottle(table -> { 159 Object[] results = new Object[] {}; 160 table.batch(incs, results); 161 return results; 162 }); 163 } 164 165 @Test 166 public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception { 167 setupAtomicOnlyReqNumQuota(); 168 169 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 170 Increment inc = new Increment(row); 171 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 172 173 List<Increment> incs = new ArrayList<>(2); 174 incs.add(inc); 175 incs.add(inc); 176 177 testThrottle(table -> { 178 Object[] results = new Object[] {}; 179 table.batch(incs, results); 180 return results; 181 }); 182 } 183 184 @Test 185 public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception { 186 setupAtomicOnlyReadSizeQuota(); 187 188 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 189 Increment inc = new Increment(row); 190 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 191 192 List<Increment> incs = new ArrayList<>(2); 193 incs.add(inc); 194 incs.add(inc); 195 196 testThrottle(table -> { 197 Object[] results = new Object[] {}; 198 table.batch(incs, results); 199 return results; 200 }); 201 } 202 203 @Test 204 public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception { 205 setupAtomicOnlyReqNumQuota(); 206 runNonAtomicPuts(); 207 } 208 209 @Test 210 public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception { 211 setupAtomicOnlyReadSizeQuota(); 212 runNonAtomicPuts(); 213 } 214 215 @Test 216 public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception { 217 setupAtomicOnlyReqNumQuota(); 218 runNonAtomicReads(); 219 } 220 221 @Test 222 public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception { 223 setupAtomicOnlyReadSizeQuota(); 224 runNonAtomicReads(); 225 } 226 227 private void runNonAtomicPuts() throws Exception { 228 Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 229 put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 230 Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 231 put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 232 233 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 234 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 235 236 List<Put> puts = new ArrayList<>(2); 237 puts.add(put1); 238 puts.add(put2); 239 240 try (Table table = getTable()) { 241 for (int i = 0; i < 100; i++) { 242 table.put(puts); 243 } 244 } 245 } 246 247 private void runNonAtomicReads() throws Exception { 248 try (Table table = getTable()) { 249 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 250 Get get = new Get(row); 251 table.get(get); 252 } 253 } 254 255 private void setupGenericQuota() throws Exception { 256 try (Admin admin = TEST_UTIL.getAdmin()) { 257 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 258 ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); 259 } 260 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 261 } 262 263 private void setupAtomicOnlyReqNumQuota() throws Exception { 264 try (Admin admin = TEST_UTIL.getAdmin()) { 265 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 266 ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES)); 267 } 268 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 269 } 270 271 private void setupAtomicOnlyReadSizeQuota() throws Exception { 272 try (Admin admin = TEST_UTIL.getAdmin()) { 273 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 274 ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES)); 275 } 276 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 277 } 278 279 private void cleanupQuota() throws Exception { 280 try (Admin admin = TEST_UTIL.getAdmin()) { 281 admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); 282 } 283 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 284 } 285 286 private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception { 287 try (Table table = getTable()) { 288 // we have a read quota configured, so this should fail 289 TEST_UTIL.waitFor(60_000, () -> { 290 try { 291 request.run(table); 292 return false; 293 } catch (Exception e) { 294 boolean success = e.getCause() instanceof RpcThrottlingException; 295 if (!success) { 296 LOG.error("Unexpected exception", e); 297 } 298 return success; 299 } 300 }); 301 } finally { 302 cleanupQuota(); 303 } 304 } 305 306 private Table getTable() throws IOException { 307 TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); 308 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 309 return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) 310 .build(); 311 } 312 313 @FunctionalInterface 314 private interface ThrowingFunction<I, O> { 315 O run(I input) throws Exception; 316 } 317 318}