001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Set; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.NamespaceDescriptor; 038import org.apache.hadoop.hbase.NamespaceNotFoundException; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.master.HMaster; 043import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.BeforeAll; 047import org.junit.jupiter.api.BeforeEach; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.Test; 050import org.junit.jupiter.api.TestInfo; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 055import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 058 059/** 060 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster. 061 */ 062@Tag(LargeTests.TAG) 063public class TestQuotaObserverChoreWithMiniCluster { 064 065 private static final Logger LOG = 066 LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class); 067 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 068 private static final AtomicLong COUNTER = new AtomicLong(0); 069 private static final long DEFAULT_WAIT_MILLIS = 500; 070 071 private HMaster master; 072 private QuotaObserverChore chore; 073 private SpaceQuotaSnapshotNotifierForTest snapshotNotifier; 074 private SpaceQuotaHelperForTests helper; 075 076 @BeforeAll 077 public static void setUp() throws Exception { 078 Configuration conf = TEST_UTIL.getConfiguration(); 079 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 080 conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, 081 SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); 082 TEST_UTIL.startMiniCluster(1); 083 } 084 085 @AfterAll 086 public static void tearDown() throws Exception { 087 TEST_UTIL.shutdownMiniCluster(); 088 } 089 090 @BeforeEach 091 public void removeAllQuotas(TestInfo testInfo) throws Exception { 092 final Connection conn = TEST_UTIL.getConnection(); 093 if (helper == null) { 094 helper = new SpaceQuotaHelperForTests(TEST_UTIL, 095 () -> testInfo.getTestMethod().get().getName(), COUNTER); 096 } 097 // Wait for the quota table to be created 098 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 099 helper.waitForQuotaTable(conn); 100 } else { 101 // Or, clean up any quotas from previous test runs. 102 helper.removeAllQuotas(conn); 103 assertEquals(0, helper.listNumDefinedQuotas(conn)); 104 } 105 106 master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 107 snapshotNotifier = (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier(); 108 assertNotNull(snapshotNotifier); 109 snapshotNotifier.clearSnapshots(); 110 chore = master.getQuotaObserverChore(); 111 } 112 113 @Test 114 public void testTableViolatesQuota() throws Exception { 115 TableName tn = helper.createTableWithRegions(10); 116 117 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 118 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS; 119 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy); 120 TEST_UTIL.getAdmin().setQuota(settings); 121 122 // Write more data than should be allowed 123 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 124 125 Map<TableName, SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots(); 126 boolean foundSnapshot = false; 127 while (!foundSnapshot) { 128 if (quotaSnapshots.isEmpty()) { 129 LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " 130 + master.getMasterQuotaManager().snapshotRegionSizes()); 131 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 132 quotaSnapshots = snapshotNotifier.copySnapshots(); 133 } else { 134 Entry<TableName, SpaceQuotaSnapshot> entry = 135 Iterables.getOnlyElement(quotaSnapshots.entrySet()); 136 assertEquals(tn, entry.getKey()); 137 final SpaceQuotaSnapshot snapshot = entry.getValue(); 138 if (!snapshot.getQuotaStatus().isInViolation()) { 139 LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot); 140 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 141 quotaSnapshots = snapshotNotifier.copySnapshots(); 142 } else { 143 foundSnapshot = true; 144 } 145 } 146 } 147 148 Entry<TableName, SpaceQuotaSnapshot> entry = 149 Iterables.getOnlyElement(quotaSnapshots.entrySet()); 150 assertEquals(tn, entry.getKey()); 151 final SpaceQuotaSnapshot snapshot = entry.getValue(); 152 assertEquals(violationPolicy, snapshot.getQuotaStatus().getPolicy().get(), 153 "Snapshot was " + snapshot); 154 assertEquals(sizeLimit, snapshot.getLimit()); 155 assertTrue(snapshot.getUsage() > snapshot.getLimit(), 156 "The usage should be greater than the limit, but were " + snapshot.getUsage() + " and " 157 + snapshot.getLimit() + ", respectively"); 158 } 159 160 @Test 161 public void testNamespaceViolatesQuota(TestInfo testInfo) throws Exception { 162 final String namespace = testInfo.getTestMethod().get().getName(); 163 final Admin admin = TEST_UTIL.getAdmin(); 164 // Ensure the namespace exists 165 try { 166 admin.getNamespaceDescriptor(namespace); 167 } catch (NamespaceNotFoundException e) { 168 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 169 admin.createNamespace(desc); 170 } 171 172 TableName tn1 = helper.createTableWithRegions(namespace, 5); 173 TableName tn2 = helper.createTableWithRegions(namespace, 5); 174 TableName tn3 = helper.createTableWithRegions(namespace, 5); 175 176 final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 177 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE; 178 QuotaSettings settings = 179 QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy); 180 admin.setQuota(settings); 181 182 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 183 admin.flush(tn1); 184 Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 185 for (int i = 0; i < 5; i++) { 186 // Check a few times to make sure we don't prematurely move to violation 187 assertEquals(0, numSnapshotsInViolation(snapshots), 188 "Should not see any quota violations after writing 2MB of data"); 189 try { 190 Thread.sleep(DEFAULT_WAIT_MILLIS); 191 } catch (InterruptedException e) { 192 LOG.debug("Interrupted while sleeping.", e); 193 } 194 snapshots = snapshotNotifier.copySnapshots(); 195 } 196 197 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 198 admin.flush(tn2); 199 snapshots = snapshotNotifier.copySnapshots(); 200 for (int i = 0; i < 5; i++) { 201 // Check a few times to make sure we don't prematurely move to violation 202 assertEquals(0, numSnapshotsInViolation(snapshots), 203 "Should not see any quota violations after writing 4MB of data"); 204 try { 205 Thread.sleep(DEFAULT_WAIT_MILLIS); 206 } catch (InterruptedException e) { 207 LOG.debug("Interrupted while sleeping.", e); 208 } 209 snapshots = snapshotNotifier.copySnapshots(); 210 } 211 212 // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total) 213 // and should push all three tables in the namespace into violation. 214 helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 215 admin.flush(tn3); 216 snapshots = snapshotNotifier.copySnapshots(); 217 while (numSnapshotsInViolation(snapshots) < 3) { 218 LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots 219 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 220 try { 221 Thread.sleep(DEFAULT_WAIT_MILLIS); 222 } catch (InterruptedException e) { 223 LOG.debug("Interrupted while sleeping.", e); 224 Thread.currentThread().interrupt(); 225 } 226 snapshots = snapshotNotifier.copySnapshots(); 227 } 228 229 SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1); 230 assertNotNull(snapshot1, "tn1 should be in violation"); 231 assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get()); 232 SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2); 233 assertNotNull(snapshot2, "tn2 should be in violation"); 234 assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get()); 235 SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3); 236 assertNotNull(snapshot3, "tn3 should be in violation"); 237 assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get()); 238 assertTrue(snapshots.isEmpty(), "Unexpected additional quota violations: " + snapshots); 239 } 240 241 @Test 242 public void testTableQuotaOverridesNamespaceQuota(TestInfo testInfo) throws Exception { 243 final String namespace = testInfo.getTestMethod().get().getName(); 244 final Admin admin = TEST_UTIL.getAdmin(); 245 // Ensure the namespace exists 246 try { 247 admin.getNamespaceDescriptor(namespace); 248 } catch (NamespaceNotFoundException e) { 249 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 250 admin.createNamespace(desc); 251 } 252 253 TableName tn1 = helper.createTableWithRegions(namespace, 5); 254 TableName tn2 = helper.createTableWithRegions(namespace, 5); 255 256 final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 257 final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE; 258 QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace, 259 namespaceSizeLimit, namespaceViolationPolicy); 260 admin.setQuota(namespaceSettings); 261 262 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 263 admin.flush(tn1); 264 Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 265 for (int i = 0; i < 5; i++) { 266 // Check a few times to make sure we don't prematurely move to violation 267 assertEquals(0, numSnapshotsInViolation(snapshots), 268 "Should not see any quota violations after writing 2MB of data: " + snapshots); 269 try { 270 Thread.sleep(DEFAULT_WAIT_MILLIS); 271 } catch (InterruptedException e) { 272 LOG.debug("Interrupted while sleeping.", e); 273 } 274 snapshots = snapshotNotifier.copySnapshots(); 275 } 276 277 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 278 admin.flush(tn2); 279 snapshots = snapshotNotifier.copySnapshots(); 280 while (numSnapshotsInViolation(snapshots) < 2) { 281 LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots 282 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 283 try { 284 Thread.sleep(DEFAULT_WAIT_MILLIS); 285 } catch (InterruptedException e) { 286 LOG.debug("Interrupted while sleeping.", e); 287 Thread.currentThread().interrupt(); 288 } 289 snapshots = snapshotNotifier.copySnapshots(); 290 } 291 292 SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1); 293 assertNotNull(actualPolicyTN1, "Expected to see violation policy for tn1"); 294 assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get()); 295 SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2); 296 assertNotNull(actualPolicyTN2, "Expected to see violation policy for tn2"); 297 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get()); 298 299 // Override the namespace quota with a table quota 300 final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE; 301 final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS; 302 QuotaSettings tableSettings = 303 QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, tableViolationPolicy); 304 admin.setQuota(tableSettings); 305 306 // Keep checking for the table quota policy to override the namespace quota 307 while (true) { 308 snapshots = snapshotNotifier.copySnapshots(); 309 SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1); 310 assertNotNull(actualTableSnapshot, "Violation policy should never be null"); 311 if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) { 312 LOG.debug("Saw unexpected table violation policy, waiting and re-checking."); 313 try { 314 Thread.sleep(DEFAULT_WAIT_MILLIS); 315 } catch (InterruptedException e) { 316 LOG.debug("Interrupted while sleeping"); 317 Thread.currentThread().interrupt(); 318 } 319 continue; 320 } 321 assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get()); 322 break; 323 } 324 325 // This should not change with the introduction of the table quota for tn1 326 actualPolicyTN2 = snapshots.get(tn2); 327 assertNotNull(actualPolicyTN2, "Expected to see violation policy for tn2"); 328 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get()); 329 } 330 331 @Test 332 public void testGetAllTablesWithQuotas() throws Exception { 333 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 334 Set<TableName> tablesWithQuotas = new HashSet<>(); 335 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 336 // Partition the tables with quotas by table and ns quota 337 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 338 339 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 340 assertEquals(tablesWithQuotas, tables.getTableQuotaTables(), "Found tables: " + tables); 341 assertEquals(namespaceTablesWithQuotas, tables.getNamespaceQuotaTables(), 342 "Found tables: " + tables); 343 } 344 345 @Test 346 public void testRpcQuotaTablesAreFiltered() throws Exception { 347 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 348 Set<TableName> tablesWithQuotas = new HashSet<>(); 349 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 350 // Partition the tables with quotas by table and ns quota 351 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 352 353 TableName rpcQuotaTable = helper.createTable(); 354 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.throttleTable(rpcQuotaTable, 355 ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)); 356 357 // The `rpcQuotaTable` should not be included in this Set 358 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 359 assertEquals(tablesWithQuotas, tables.getTableQuotaTables(), "Found tables: " + tables); 360 assertEquals(namespaceTablesWithQuotas, tables.getNamespaceQuotaTables(), 361 "Found tables: " + tables); 362 } 363 364 @Test 365 public void testFilterRegions() throws Exception { 366 Map<TableName, Integer> mockReportedRegions = new HashMap<>(); 367 // Can't mock because of primitive int as a return type -- Mockito 368 // can only handle an Integer. 369 TablesWithQuotas tables = 370 new TablesWithQuotas(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()) { 371 @Override 372 int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { 373 Integer i = mockReportedRegions.get(table); 374 if (i == null) { 375 return 0; 376 } 377 return i; 378 } 379 }; 380 381 // Create the tables 382 TableName tn1 = helper.createTableWithRegions(20); 383 TableName tn2 = helper.createTableWithRegions(20); 384 TableName tn3 = helper.createTableWithRegions(20); 385 386 // Add them to the Tables with Quotas object 387 tables.addTableQuotaTable(tn1); 388 tables.addTableQuotaTable(tn2); 389 tables.addTableQuotaTable(tn3); 390 391 // Mock the number of regions reported 392 mockReportedRegions.put(tn1, 10); // 50% 393 mockReportedRegions.put(tn2, 19); // 95% 394 mockReportedRegions.put(tn3, 20); // 100% 395 396 // Argument is un-used 397 tables.filterInsufficientlyReportedTables(null); 398 // The default of 95% reported should prevent tn1 from appearing 399 assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables()); 400 } 401 402 @Test 403 public void testFetchSpaceQuota() throws Exception { 404 Multimap<TableName, QuotaSettings> tables = helper.createTablesWithSpaceQuotas(); 405 // Can pass in an empty map, we're not consulting it. 406 chore.initializeSnapshotStores(Collections.emptyMap()); 407 // All tables that were created should have a quota defined. 408 for (Entry<TableName, QuotaSettings> entry : tables.entries()) { 409 final TableName table = entry.getKey(); 410 final QuotaSettings qs = entry.getValue(); 411 412 assertTrue(qs instanceof SpaceLimitSettings, 413 "QuotaSettings was an instance of " + qs.getClass()); 414 415 SpaceQuota spaceQuota = null; 416 if (qs.getTableName() != null) { 417 spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table); 418 assertNotNull(spaceQuota, "Could not find table space quota for " + table); 419 } else if (qs.getNamespace() != null) { 420 spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString()); 421 assertNotNull(spaceQuota, 422 "Could not find namespace space quota for " + table.getNamespaceAsString()); 423 } else { 424 fail("Expected table or namespace space quota"); 425 } 426 427 final SpaceLimitSettings sls = (SpaceLimitSettings) qs; 428 assertEquals(sls.getProto().getQuota(), spaceQuota); 429 } 430 431 TableName tableWithoutQuota = helper.createTable(); 432 assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota)); 433 } 434 435 private int numSnapshotsInViolation(Map<TableName, SpaceQuotaSnapshot> snapshots) { 436 int sum = 0; 437 for (SpaceQuotaSnapshot snapshot : snapshots.values()) { 438 if (snapshot.getQuotaStatus().isInViolation()) { 439 sum++; 440 } 441 } 442 return sum; 443 } 444 445 private void sleepWithInterrupt(long millis) { 446 try { 447 Thread.sleep(millis); 448 } catch (InterruptedException e) { 449 LOG.debug("Interrupted while sleeping"); 450 Thread.currentThread().interrupt(); 451 } 452 } 453}