001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Set; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.NamespaceNotFoundException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Rule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.rules.TestName; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 058import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 061 062/** 063 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster. 064 */ 065@Category(LargeTests.class) 066public class TestQuotaObserverChoreWithMiniCluster { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestQuotaObserverChoreWithMiniCluster.class); 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class); 074 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 private static final AtomicLong COUNTER = new AtomicLong(0); 076 private static final long DEFAULT_WAIT_MILLIS = 500; 077 078 @Rule 079 public TestName testName = new TestName(); 080 081 private HMaster master; 082 private QuotaObserverChore chore; 083 private SpaceQuotaSnapshotNotifierForTest snapshotNotifier; 084 private SpaceQuotaHelperForTests helper; 085 086 @BeforeClass 087 public static void setUp() throws Exception { 088 Configuration conf = TEST_UTIL.getConfiguration(); 089 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 090 conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, 091 SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); 092 TEST_UTIL.startMiniCluster(1); 093 } 094 095 @AfterClass 096 public static void tearDown() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void removeAllQuotas() throws Exception { 102 final Connection conn = TEST_UTIL.getConnection(); 103 if (helper == null) { 104 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 105 } 106 // Wait for the quota table to be created 107 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 108 helper.waitForQuotaTable(conn); 109 } else { 110 // Or, clean up any quotas from previous test runs. 111 helper.removeAllQuotas(conn); 112 assertEquals(0, helper.listNumDefinedQuotas(conn)); 113 } 114 115 master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 116 snapshotNotifier = 117 (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier(); 118 assertNotNull(snapshotNotifier); 119 snapshotNotifier.clearSnapshots(); 120 chore = master.getQuotaObserverChore(); 121 } 122 123 @Test 124 public void testTableViolatesQuota() throws Exception { 125 TableName tn = helper.createTableWithRegions(10); 126 127 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 128 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS; 129 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy); 130 TEST_UTIL.getAdmin().setQuota(settings); 131 132 // Write more data than should be allowed 133 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 134 135 Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots(); 136 boolean foundSnapshot = false; 137 while (!foundSnapshot) { 138 if (quotaSnapshots.isEmpty()) { 139 LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " 140 + master.getMasterQuotaManager().snapshotRegionSizes()); 141 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 142 quotaSnapshots = snapshotNotifier.copySnapshots(); 143 } else { 144 Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet()); 145 assertEquals(tn, entry.getKey()); 146 final SpaceQuotaSnapshot snapshot = entry.getValue(); 147 if (!snapshot.getQuotaStatus().isInViolation()) { 148 LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot); 149 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 150 quotaSnapshots = snapshotNotifier.copySnapshots(); 151 } else { 152 foundSnapshot = true; 153 } 154 } 155 } 156 157 Entry<TableName, SpaceQuotaSnapshot> entry = 158 Iterables.getOnlyElement(quotaSnapshots.entrySet()); 159 assertEquals(tn, entry.getKey()); 160 final SpaceQuotaSnapshot snapshot = entry.getValue(); 161 assertEquals("Snapshot was " + snapshot, violationPolicy, 162 snapshot.getQuotaStatus().getPolicy().get()); 163 assertEquals(sizeLimit, snapshot.getLimit()); 164 assertTrue("The usage should be greater than the limit, but were " + snapshot.getUsage() + 165 " and " + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit()); 166 } 167 168 @Test 169 public void testNamespaceViolatesQuota() throws Exception { 170 final String namespace = testName.getMethodName(); 171 final Admin admin = TEST_UTIL.getAdmin(); 172 // Ensure the namespace exists 173 try { 174 admin.getNamespaceDescriptor(namespace); 175 } catch (NamespaceNotFoundException e) { 176 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 177 admin.createNamespace(desc); 178 } 179 180 TableName tn1 = helper.createTableWithRegions(namespace, 5); 181 TableName tn2 = helper.createTableWithRegions(namespace, 5); 182 TableName tn3 = helper.createTableWithRegions(namespace, 5); 183 184 final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 185 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE; 186 QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy); 187 admin.setQuota(settings); 188 189 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 190 admin.flush(tn1); 191 Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 192 for (int i = 0; i < 5; i++) { 193 // Check a few times to make sure we don't prematurely move to violation 194 assertEquals( 195 "Should not see any quota violations after writing 2MB of data", 0, 196 numSnapshotsInViolation(snapshots)); 197 try { 198 Thread.sleep(DEFAULT_WAIT_MILLIS); 199 } catch (InterruptedException e) { 200 LOG.debug("Interrupted while sleeping." , e); 201 } 202 snapshots = snapshotNotifier.copySnapshots(); 203 } 204 205 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 206 admin.flush(tn2); 207 snapshots = snapshotNotifier.copySnapshots(); 208 for (int i = 0; i < 5; i++) { 209 // Check a few times to make sure we don't prematurely move to violation 210 assertEquals("Should not see any quota violations after writing 4MB of data", 0, 211 numSnapshotsInViolation(snapshots)); 212 try { 213 Thread.sleep(DEFAULT_WAIT_MILLIS); 214 } catch (InterruptedException e) { 215 LOG.debug("Interrupted while sleeping." , e); 216 } 217 snapshots = snapshotNotifier.copySnapshots(); 218 } 219 220 // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total) 221 // and should push all three tables in the namespace into violation. 222 helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 223 admin.flush(tn3); 224 snapshots = snapshotNotifier.copySnapshots(); 225 while (numSnapshotsInViolation(snapshots) < 3) { 226 LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots 227 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 228 try { 229 Thread.sleep(DEFAULT_WAIT_MILLIS); 230 } catch (InterruptedException e) { 231 LOG.debug("Interrupted while sleeping.", e); 232 Thread.currentThread().interrupt(); 233 } 234 snapshots = snapshotNotifier.copySnapshots(); 235 } 236 237 SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1); 238 assertNotNull("tn1 should be in violation", snapshot1); 239 assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get()); 240 SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2); 241 assertNotNull("tn2 should be in violation", snapshot2); 242 assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get()); 243 SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3); 244 assertNotNull("tn3 should be in violation", snapshot3); 245 assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get()); 246 assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty()); 247 } 248 249 @Test 250 public void testTableQuotaOverridesNamespaceQuota() throws Exception { 251 final String namespace = testName.getMethodName(); 252 final Admin admin = TEST_UTIL.getAdmin(); 253 // Ensure the namespace exists 254 try { 255 admin.getNamespaceDescriptor(namespace); 256 } catch (NamespaceNotFoundException e) { 257 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 258 admin.createNamespace(desc); 259 } 260 261 TableName tn1 = helper.createTableWithRegions(namespace, 5); 262 TableName tn2 = helper.createTableWithRegions(namespace, 5); 263 264 final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 265 final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE; 266 QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace, 267 namespaceSizeLimit, namespaceViolationPolicy); 268 admin.setQuota(namespaceSettings); 269 270 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 271 admin.flush(tn1); 272 Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 273 for (int i = 0; i < 5; i++) { 274 // Check a few times to make sure we don't prematurely move to violation 275 assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0, 276 numSnapshotsInViolation(snapshots)); 277 try { 278 Thread.sleep(DEFAULT_WAIT_MILLIS); 279 } catch (InterruptedException e) { 280 LOG.debug("Interrupted while sleeping." , e); 281 } 282 snapshots = snapshotNotifier.copySnapshots(); 283 } 284 285 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 286 admin.flush(tn2); 287 snapshots = snapshotNotifier.copySnapshots(); 288 while (numSnapshotsInViolation(snapshots) < 2) { 289 LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots 290 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 291 try { 292 Thread.sleep(DEFAULT_WAIT_MILLIS); 293 } catch (InterruptedException e) { 294 LOG.debug("Interrupted while sleeping.", e); 295 Thread.currentThread().interrupt(); 296 } 297 snapshots = snapshotNotifier.copySnapshots(); 298 } 299 300 SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1); 301 assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1); 302 assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get()); 303 SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2); 304 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); 305 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get()); 306 307 // Override the namespace quota with a table quota 308 final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE; 309 final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS; 310 QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, 311 tableViolationPolicy); 312 admin.setQuota(tableSettings); 313 314 // Keep checking for the table quota policy to override the namespace quota 315 while (true) { 316 snapshots = snapshotNotifier.copySnapshots(); 317 SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1); 318 assertNotNull("Violation policy should never be null", actualTableSnapshot); 319 if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) { 320 LOG.debug("Saw unexpected table violation policy, waiting and re-checking."); 321 try { 322 Thread.sleep(DEFAULT_WAIT_MILLIS); 323 } catch (InterruptedException e) { 324 LOG.debug("Interrupted while sleeping"); 325 Thread.currentThread().interrupt(); 326 } 327 continue; 328 } 329 assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get()); 330 break; 331 } 332 333 // This should not change with the introduction of the table quota for tn1 334 actualPolicyTN2 = snapshots.get(tn2); 335 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); 336 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get()); 337 } 338 339 @Test 340 public void testGetAllTablesWithQuotas() throws Exception { 341 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 342 Set<TableName> tablesWithQuotas = new HashSet<>(); 343 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 344 // Partition the tables with quotas by table and ns quota 345 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 346 347 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 348 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); 349 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables()); 350 } 351 352 @Test 353 public void testRpcQuotaTablesAreFiltered() throws Exception { 354 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 355 Set<TableName> tablesWithQuotas = new HashSet<>(); 356 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 357 // Partition the tables with quotas by table and ns quota 358 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 359 360 TableName rpcQuotaTable = helper.createTable(); 361 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory 362 .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)); 363 364 // The `rpcQuotaTable` should not be included in this Set 365 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 366 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); 367 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables()); 368 } 369 370 @Test 371 public void testFilterRegions() throws Exception { 372 Map<TableName,Integer> mockReportedRegions = new HashMap<>(); 373 // Can't mock because of primitive int as a return type -- Mockito 374 // can only handle an Integer. 375 TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(), 376 TEST_UTIL.getConfiguration()) { 377 @Override 378 int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { 379 Integer i = mockReportedRegions.get(table); 380 if (i == null) { 381 return 0; 382 } 383 return i; 384 } 385 }; 386 387 // Create the tables 388 TableName tn1 = helper.createTableWithRegions(20); 389 TableName tn2 = helper.createTableWithRegions(20); 390 TableName tn3 = helper.createTableWithRegions(20); 391 392 // Add them to the Tables with Quotas object 393 tables.addTableQuotaTable(tn1); 394 tables.addTableQuotaTable(tn2); 395 tables.addTableQuotaTable(tn3); 396 397 // Mock the number of regions reported 398 mockReportedRegions.put(tn1, 10); // 50% 399 mockReportedRegions.put(tn2, 19); // 95% 400 mockReportedRegions.put(tn3, 20); // 100% 401 402 // Argument is un-used 403 tables.filterInsufficientlyReportedTables(null); 404 // The default of 95% reported should prevent tn1 from appearing 405 assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables()); 406 } 407 408 @Test 409 public void testFetchSpaceQuota() throws Exception { 410 Multimap<TableName,QuotaSettings> tables = helper.createTablesWithSpaceQuotas(); 411 // Can pass in an empty map, we're not consulting it. 412 chore.initializeSnapshotStores(Collections.emptyMap()); 413 // All tables that were created should have a quota defined. 414 for (Entry<TableName,QuotaSettings> entry : tables.entries()) { 415 final TableName table = entry.getKey(); 416 final QuotaSettings qs = entry.getValue(); 417 418 assertTrue("QuotaSettings was an instance of " + qs.getClass(), 419 qs instanceof SpaceLimitSettings); 420 421 SpaceQuota spaceQuota = null; 422 if (qs.getTableName() != null) { 423 spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table); 424 assertNotNull("Could not find table space quota for " + table, spaceQuota); 425 } else if (qs.getNamespace() != null) { 426 spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString()); 427 assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota); 428 } else { 429 fail("Expected table or namespace space quota"); 430 } 431 432 final SpaceLimitSettings sls = (SpaceLimitSettings) qs; 433 assertEquals(sls.getProto().getQuota(), spaceQuota); 434 } 435 436 TableName tableWithoutQuota = helper.createTable(); 437 assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota)); 438 } 439 440 private int numSnapshotsInViolation(Map<TableName,SpaceQuotaSnapshot> snapshots) { 441 int sum = 0; 442 for (SpaceQuotaSnapshot snapshot : snapshots.values()) { 443 if (snapshot.getQuotaStatus().isInViolation()) { 444 sum++; 445 } 446 } 447 return sum; 448 } 449 450 private void sleepWithInterrupt(long millis) { 451 try { 452 Thread.sleep(millis); 453 } catch (InterruptedException e) { 454 LOG.debug("Interrupted while sleeping"); 455 Thread.currentThread().interrupt(); 456 } 457 } 458}