001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Set; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.NamespaceNotFoundException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Rule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.rules.TestName; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 058import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 061 062/** 063 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster. 064 */ 065@Category(LargeTests.class) 066public class TestQuotaObserverChoreWithMiniCluster { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestQuotaObserverChoreWithMiniCluster.class); 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class); 074 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 private static final AtomicLong COUNTER = new AtomicLong(0); 076 private static final long DEFAULT_WAIT_MILLIS = 500; 077 078 @Rule 079 public TestName testName = new TestName(); 080 081 private HMaster master; 082 private QuotaObserverChore chore; 083 private SpaceQuotaSnapshotNotifierForTest snapshotNotifier; 084 private SpaceQuotaHelperForTests helper; 085 086 @BeforeClass 087 public static void setUp() throws Exception { 088 Configuration conf = TEST_UTIL.getConfiguration(); 089 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 090 conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, 091 SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); 092 TEST_UTIL.startMiniCluster(1); 093 } 094 095 @AfterClass 096 public static void tearDown() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void removeAllQuotas() throws Exception { 102 final Connection conn = TEST_UTIL.getConnection(); 103 if (helper == null) { 104 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 105 } 106 // Wait for the quota table to be created 107 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 108 helper.waitForQuotaTable(conn); 109 } else { 110 // Or, clean up any quotas from previous test runs. 111 helper.removeAllQuotas(conn); 112 assertEquals(0, helper.listNumDefinedQuotas(conn)); 113 } 114 115 master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 116 snapshotNotifier = 117 (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier(); 118 assertNotNull(snapshotNotifier); 119 snapshotNotifier.clearSnapshots(); 120 chore = master.getQuotaObserverChore(); 121 } 122 123 @Test 124 public void testTableViolatesQuota() throws Exception { 125 TableName tn = helper.createTableWithRegions(10); 126 127 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 128 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS; 129 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy); 130 TEST_UTIL.getAdmin().setQuota(settings); 131 132 // Write more data than should be allowed 133 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 134 135 Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots(); 136 boolean foundSnapshot = false; 137 while (!foundSnapshot) { 138 if (quotaSnapshots.isEmpty()) { 139 LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " 140 + master.getMasterQuotaManager().snapshotRegionSizes()); 141 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 142 quotaSnapshots = snapshotNotifier.copySnapshots(); 143 } else { 144 Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet()); 145 assertEquals(tn, entry.getKey()); 146 final SpaceQuotaSnapshot snapshot = entry.getValue(); 147 if (!snapshot.getQuotaStatus().isInViolation()) { 148 LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot); 149 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 150 quotaSnapshots = snapshotNotifier.copySnapshots(); 151 } else { 152 foundSnapshot = true; 153 } 154 } 155 } 156 157 Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet()); 158 assertEquals(tn, entry.getKey()); 159 final SpaceQuotaSnapshot snapshot = entry.getValue(); 160 assertEquals("Snapshot was " + snapshot, violationPolicy, snapshot.getQuotaStatus().getPolicy()); 161 assertEquals(sizeLimit, snapshot.getLimit()); 162 assertTrue( 163 "The usage should be greater than the limit, but were " + snapshot.getUsage() + " and " 164 + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit()); 165 } 166 167 @Test 168 public void testNamespaceViolatesQuota() throws Exception { 169 final String namespace = testName.getMethodName(); 170 final Admin admin = TEST_UTIL.getAdmin(); 171 // Ensure the namespace exists 172 try { 173 admin.getNamespaceDescriptor(namespace); 174 } catch (NamespaceNotFoundException e) { 175 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 176 admin.createNamespace(desc); 177 } 178 179 TableName tn1 = helper.createTableWithRegions(namespace, 5); 180 TableName tn2 = helper.createTableWithRegions(namespace, 5); 181 TableName tn3 = helper.createTableWithRegions(namespace, 5); 182 183 final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 184 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE; 185 QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy); 186 admin.setQuota(settings); 187 188 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 189 admin.flush(tn1); 190 Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 191 for (int i = 0; i < 5; i++) { 192 // Check a few times to make sure we don't prematurely move to violation 193 assertEquals( 194 "Should not see any quota violations after writing 2MB of data", 0, 195 numSnapshotsInViolation(snapshots)); 196 try { 197 Thread.sleep(DEFAULT_WAIT_MILLIS); 198 } catch (InterruptedException e) { 199 LOG.debug("Interrupted while sleeping." , e); 200 } 201 snapshots = snapshotNotifier.copySnapshots(); 202 } 203 204 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 205 admin.flush(tn2); 206 snapshots = snapshotNotifier.copySnapshots(); 207 for (int i = 0; i < 5; i++) { 208 // Check a few times to make sure we don't prematurely move to violation 209 assertEquals("Should not see any quota violations after writing 4MB of data", 0, 210 numSnapshotsInViolation(snapshots)); 211 try { 212 Thread.sleep(DEFAULT_WAIT_MILLIS); 213 } catch (InterruptedException e) { 214 LOG.debug("Interrupted while sleeping." , e); 215 } 216 snapshots = snapshotNotifier.copySnapshots(); 217 } 218 219 // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total) 220 // and should push all three tables in the namespace into violation. 221 helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 222 admin.flush(tn3); 223 snapshots = snapshotNotifier.copySnapshots(); 224 while (numSnapshotsInViolation(snapshots) < 3) { 225 LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots 226 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 227 try { 228 Thread.sleep(DEFAULT_WAIT_MILLIS); 229 } catch (InterruptedException e) { 230 LOG.debug("Interrupted while sleeping.", e); 231 Thread.currentThread().interrupt(); 232 } 233 snapshots = snapshotNotifier.copySnapshots(); 234 } 235 236 SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1); 237 assertNotNull("tn1 should be in violation", snapshot1); 238 assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy()); 239 SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2); 240 assertNotNull("tn2 should be in violation", snapshot2); 241 assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy()); 242 SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3); 243 assertNotNull("tn3 should be in violation", snapshot3); 244 assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy()); 245 assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty()); 246 } 247 248 @Test 249 public void testTableQuotaOverridesNamespaceQuota() throws Exception { 250 final String namespace = testName.getMethodName(); 251 final Admin admin = TEST_UTIL.getAdmin(); 252 // Ensure the namespace exists 253 try { 254 admin.getNamespaceDescriptor(namespace); 255 } catch (NamespaceNotFoundException e) { 256 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 257 admin.createNamespace(desc); 258 } 259 260 TableName tn1 = helper.createTableWithRegions(namespace, 5); 261 TableName tn2 = helper.createTableWithRegions(namespace, 5); 262 263 final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 264 final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE; 265 QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace, 266 namespaceSizeLimit, namespaceViolationPolicy); 267 admin.setQuota(namespaceSettings); 268 269 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 270 admin.flush(tn1); 271 Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 272 for (int i = 0; i < 5; i++) { 273 // Check a few times to make sure we don't prematurely move to violation 274 assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0, 275 numSnapshotsInViolation(snapshots)); 276 try { 277 Thread.sleep(DEFAULT_WAIT_MILLIS); 278 } catch (InterruptedException e) { 279 LOG.debug("Interrupted while sleeping." , e); 280 } 281 snapshots = snapshotNotifier.copySnapshots(); 282 } 283 284 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 285 admin.flush(tn2); 286 snapshots = snapshotNotifier.copySnapshots(); 287 while (numSnapshotsInViolation(snapshots) < 2) { 288 LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots 289 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 290 try { 291 Thread.sleep(DEFAULT_WAIT_MILLIS); 292 } catch (InterruptedException e) { 293 LOG.debug("Interrupted while sleeping.", e); 294 Thread.currentThread().interrupt(); 295 } 296 snapshots = snapshotNotifier.copySnapshots(); 297 } 298 299 SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1); 300 assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1); 301 assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy()); 302 SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2); 303 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); 304 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy()); 305 306 // Override the namespace quota with a table quota 307 final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE; 308 final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS; 309 QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, 310 tableViolationPolicy); 311 admin.setQuota(tableSettings); 312 313 // Keep checking for the table quota policy to override the namespace quota 314 while (true) { 315 snapshots = snapshotNotifier.copySnapshots(); 316 SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1); 317 assertNotNull("Violation policy should never be null", actualTableSnapshot); 318 if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy()) { 319 LOG.debug("Saw unexpected table violation policy, waiting and re-checking."); 320 try { 321 Thread.sleep(DEFAULT_WAIT_MILLIS); 322 } catch (InterruptedException e) { 323 LOG.debug("Interrupted while sleeping"); 324 Thread.currentThread().interrupt(); 325 } 326 continue; 327 } 328 assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy()); 329 break; 330 } 331 332 // This should not change with the introduction of the table quota for tn1 333 actualPolicyTN2 = snapshots.get(tn2); 334 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); 335 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy()); 336 } 337 338 @Test 339 public void testGetAllTablesWithQuotas() throws Exception { 340 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 341 Set<TableName> tablesWithQuotas = new HashSet<>(); 342 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 343 // Partition the tables with quotas by table and ns quota 344 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 345 346 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 347 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); 348 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables()); 349 } 350 351 @Test 352 public void testRpcQuotaTablesAreFiltered() throws Exception { 353 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 354 Set<TableName> tablesWithQuotas = new HashSet<>(); 355 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 356 // Partition the tables with quotas by table and ns quota 357 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 358 359 TableName rpcQuotaTable = helper.createTable(); 360 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory 361 .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)); 362 363 // The `rpcQuotaTable` should not be included in this Set 364 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 365 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); 366 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables()); 367 } 368 369 @Test 370 public void testFilterRegions() throws Exception { 371 Map<TableName,Integer> mockReportedRegions = new HashMap<>(); 372 // Can't mock because of primitive int as a return type -- Mockito 373 // can only handle an Integer. 374 TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(), 375 TEST_UTIL.getConfiguration()) { 376 @Override 377 int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { 378 Integer i = mockReportedRegions.get(table); 379 if (i == null) { 380 return 0; 381 } 382 return i; 383 } 384 }; 385 386 // Create the tables 387 TableName tn1 = helper.createTableWithRegions(20); 388 TableName tn2 = helper.createTableWithRegions(20); 389 TableName tn3 = helper.createTableWithRegions(20); 390 391 // Add them to the Tables with Quotas object 392 tables.addTableQuotaTable(tn1); 393 tables.addTableQuotaTable(tn2); 394 tables.addTableQuotaTable(tn3); 395 396 // Mock the number of regions reported 397 mockReportedRegions.put(tn1, 10); // 50% 398 mockReportedRegions.put(tn2, 19); // 95% 399 mockReportedRegions.put(tn3, 20); // 100% 400 401 // Argument is un-used 402 tables.filterInsufficientlyReportedTables(null); 403 // The default of 95% reported should prevent tn1 from appearing 404 assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables()); 405 } 406 407 @Test 408 public void testFetchSpaceQuota() throws Exception { 409 Multimap<TableName,QuotaSettings> tables = helper.createTablesWithSpaceQuotas(); 410 // Can pass in an empty map, we're not consulting it. 411 chore.initializeSnapshotStores(Collections.emptyMap()); 412 // All tables that were created should have a quota defined. 413 for (Entry<TableName,QuotaSettings> entry : tables.entries()) { 414 final TableName table = entry.getKey(); 415 final QuotaSettings qs = entry.getValue(); 416 417 assertTrue("QuotaSettings was an instance of " + qs.getClass(), 418 qs instanceof SpaceLimitSettings); 419 420 SpaceQuota spaceQuota = null; 421 if (qs.getTableName() != null) { 422 spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table); 423 assertNotNull("Could not find table space quota for " + table, spaceQuota); 424 } else if (qs.getNamespace() != null) { 425 spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString()); 426 assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota); 427 } else { 428 fail("Expected table or namespace space quota"); 429 } 430 431 final SpaceLimitSettings sls = (SpaceLimitSettings) qs; 432 assertEquals(sls.getProto().getQuota(), spaceQuota); 433 } 434 435 TableName tableWithoutQuota = helper.createTable(); 436 assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota)); 437 } 438 439 private int numSnapshotsInViolation(Map<TableName,SpaceQuotaSnapshot> snapshots) { 440 int sum = 0; 441 for (SpaceQuotaSnapshot snapshot : snapshots.values()) { 442 if (snapshot.getQuotaStatus().isInViolation()) { 443 sum++; 444 } 445 } 446 return sum; 447 } 448 449 private void sleepWithInterrupt(long millis) { 450 try { 451 Thread.sleep(millis); 452 } catch (InterruptedException e) { 453 LOG.debug("Interrupted while sleeping"); 454 Thread.currentThread().interrupt(); 455 } 456 } 457}