001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Append; 040import org.apache.hadoop.hbase.client.ClientServiceCallable; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Increment; 044import org.apache.hadoop.hbase.client.Mutation; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.RpcRetryingCaller; 050import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.master.HMaster; 054import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement; 055import org.apache.hadoop.hbase.regionserver.HRegionServer; 056import org.apache.hadoop.hbase.security.AccessDeniedException; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.util.StringUtils; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * End-to-end test class for filesystem space quotas. 073 */ 074@Category(LargeTests.class) 075public class TestSpaceQuotas { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestSpaceQuotas.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestSpaceQuotas.class); 082 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 // Global for all tests in the class 084 private static final AtomicLong COUNTER = new AtomicLong(0); 085 private static final int NUM_RETRIES = 10; 086 087 @Rule 088 public TestName testName = new TestName(); 089 private SpaceQuotaHelperForTests helper; 090 091 @BeforeClass 092 public static void setUp() throws Exception { 093 Configuration conf = TEST_UTIL.getConfiguration(); 094 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 095 TEST_UTIL.startMiniCluster(1); 096 } 097 098 @AfterClass 099 public static void tearDown() throws Exception { 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 @Before 104 public void removeAllQuotas() throws Exception { 105 final Connection conn = TEST_UTIL.getConnection(); 106 if (helper == null) { 107 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 108 } 109 // Wait for the quota table to be created 110 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 111 helper.waitForQuotaTable(conn); 112 } else { 113 // Or, clean up any quotas from previous test runs. 114 helper.removeAllQuotas(conn); 115 assertEquals(0, helper.listNumDefinedQuotas(conn)); 116 } 117 } 118 119 @Test 120 public void testNoInsertsWithPut() throws Exception { 121 Put p = new Put(Bytes.toBytes("to_reject")); 122 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 123 Bytes.toBytes("reject")); 124 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p); 125 } 126 127 @Test 128 public void testNoInsertsWithAppend() throws Exception { 129 Append a = new Append(Bytes.toBytes("to_reject")); 130 a.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 131 Bytes.toBytes("reject")); 132 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a); 133 } 134 135 @Test 136 public void testNoInsertsWithIncrement() throws Exception { 137 Increment i = new Increment(Bytes.toBytes("to_reject")); 138 i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); 139 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i); 140 } 141 142 @Test 143 public void testDeletesAfterNoInserts() throws Exception { 144 final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS); 145 // Try a couple of times to verify that the quota never gets enforced, same as we 146 // do when we're trying to catch the failure. 147 Delete d = new Delete(Bytes.toBytes("should_not_be_rejected")); 148 for (int i = 0; i < NUM_RETRIES; i++) { 149 try (Table t = TEST_UTIL.getConnection().getTable(tn)) { 150 t.delete(d); 151 } 152 } 153 } 154 155 @Test 156 public void testNoWritesWithPut() throws Exception { 157 Put p = new Put(Bytes.toBytes("to_reject")); 158 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 159 Bytes.toBytes("reject")); 160 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); 161 } 162 163 @Test 164 public void testNoWritesWithAppend() throws Exception { 165 Append a = new Append(Bytes.toBytes("to_reject")); 166 a.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 167 Bytes.toBytes("reject")); 168 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a); 169 } 170 171 @Test 172 public void testNoWritesWithIncrement() throws Exception { 173 Increment i = new Increment(Bytes.toBytes("to_reject")); 174 i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); 175 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i); 176 } 177 178 @Test 179 public void testNoWritesWithDelete() throws Exception { 180 Delete d = new Delete(Bytes.toBytes("to_reject")); 181 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d); 182 } 183 184 @Test 185 public void testNoCompactions() throws Exception { 186 Put p = new Put(Bytes.toBytes("to_reject")); 187 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 188 Bytes.toBytes("reject")); 189 final TableName tn = 190 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p); 191 // We know the policy is active at this point 192 193 // Major compactions should be rejected 194 try { 195 TEST_UTIL.getAdmin().majorCompact(tn); 196 fail("Expected that invoking the compaction should throw an Exception"); 197 } catch (DoNotRetryIOException e) { 198 // Expected! 199 } 200 // Minor compactions should also be rejected. 201 try { 202 TEST_UTIL.getAdmin().compact(tn); 203 fail("Expected that invoking the compaction should throw an Exception"); 204 } catch (DoNotRetryIOException e) { 205 // Expected! 206 } 207 } 208 209 @Test 210 public void testNoEnableAfterDisablePolicy() throws Exception { 211 Put p = new Put(Bytes.toBytes("to_reject")); 212 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 213 Bytes.toBytes("reject")); 214 final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE); 215 final Admin admin = TEST_UTIL.getAdmin(); 216 // Disabling a table relies on some external action (over the other policies), so wait a bit 217 // more than the other tests. 218 for (int i = 0; i < NUM_RETRIES * 2; i++) { 219 if (admin.isTableEnabled(tn)) { 220 LOG.info(tn + " is still enabled, expecting it to be disabled. Will wait and re-check."); 221 Thread.sleep(2000); 222 } 223 } 224 assertFalse(tn + " is still enabled but it should be disabled", admin.isTableEnabled(tn)); 225 try { 226 admin.enableTable(tn); 227 } catch (AccessDeniedException e) { 228 String exceptionContents = StringUtils.stringifyException(e); 229 final String expectedText = "violated space quota"; 230 assertTrue( 231 "Expected the exception to contain " + expectedText + ", but was: " + exceptionContents, 232 exceptionContents.contains(expectedText)); 233 } 234 } 235 236 @Test 237 public void testNoBulkLoadsWithNoWrites() throws Exception { 238 Put p = new Put(Bytes.toBytes("to_reject")); 239 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 240 Bytes.toBytes("reject")); 241 TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); 242 243 // The table is now in violation. Try to do a bulk load 244 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50); 245 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); 246 RpcRetryingCaller<Void> caller = factory.newCaller(); 247 try { 248 caller.callWithRetries(callable, Integer.MAX_VALUE); 249 fail("Expected the bulk load call to fail!"); 250 } catch (SpaceLimitingException e) { 251 // Pass 252 LOG.trace("Caught expected exception", e); 253 } 254 } 255 256 @Test 257 public void testAtomicBulkLoadUnderQuota() throws Exception { 258 // Need to verify that if the batch of hfiles cannot be loaded, none are loaded. 259 TableName tn = helper.createTableWithRegions(10); 260 261 final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE; 262 QuotaSettings settings = 263 QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS); 264 TEST_UTIL.getAdmin().setQuota(settings); 265 266 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 267 RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager(); 268 Map<TableName, SpaceQuotaSnapshot> snapshots = spaceQuotaManager.copyQuotaSnapshots(); 269 Map<RegionInfo, Long> regionSizes = getReportedSizesForTable(tn); 270 while (true) { 271 SpaceQuotaSnapshot snapshot = snapshots.get(tn); 272 if (snapshot != null && snapshot.getLimit() > 0) { 273 break; 274 } 275 LOG.debug("Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: " 276 + regionSizes); 277 Thread.sleep(3000); 278 snapshots = spaceQuotaManager.copyQuotaSnapshots(); 279 regionSizes = getReportedSizesForTable(tn); 280 } 281 // Our quota limit should be reflected in the latest snapshot 282 SpaceQuotaSnapshot snapshot = snapshots.get(tn); 283 assertEquals(0L, snapshot.getUsage()); 284 assertEquals(sizeLimit, snapshot.getLimit()); 285 286 // We would also not have a "real" policy in violation 287 ActivePolicyEnforcement activePolicies = spaceQuotaManager.getActiveEnforcements(); 288 SpaceViolationPolicyEnforcement enforcement = activePolicies.getPolicyEnforcement(tn); 289 assertTrue("Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(), 290 enforcement instanceof DefaultViolationPolicyEnforcement); 291 292 // Should generate two files, each of which is over 25KB each 293 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 2, 525); 294 FileSystem fs = TEST_UTIL.getTestFileSystem(); 295 FileStatus[] files = 296 fs.listStatus(new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); 297 for (FileStatus file : files) { 298 assertTrue("Expected the file, " + file.getPath() 299 + ", length to be larger than 25KB, but was " + file.getLen(), 300 file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); 301 LOG.debug(file.getPath() + " -> " + file.getLen() + "B"); 302 } 303 304 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); 305 RpcRetryingCaller<Void> caller = factory.newCaller(); 306 try { 307 caller.callWithRetries(callable, Integer.MAX_VALUE); 308 fail("Expected the bulk load call to fail!"); 309 } catch (SpaceLimitingException e) { 310 // Pass 311 LOG.trace("Caught expected exception", e); 312 } 313 // Verify that we have no data in the table because neither file should have been 314 // loaded even though one of the files could have. 315 Table table = TEST_UTIL.getConnection().getTable(tn); 316 ResultScanner scanner = table.getScanner(new Scan()); 317 try { 318 assertNull("Expected no results", scanner.next()); 319 } finally { 320 scanner.close(); 321 } 322 } 323 324 @Test 325 public void testTableQuotaOverridesNamespaceQuota() throws Exception { 326 final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS; 327 final TableName tn = helper.createTableWithRegions(10); 328 329 // 2MB limit on the table, 1GB limit on the namespace 330 final long tableLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 331 final long namespaceLimit = 1024L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 332 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy)); 333 TEST_UTIL.getAdmin().setQuota( 334 QuotaSettingsFactory.limitNamespaceSpace(tn.getNamespaceAsString(), namespaceLimit, policy)); 335 336 // Write more data than should be allowed and flush it to disk 337 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 338 339 // This should be sufficient time for the chores to run and see the change. 340 Thread.sleep(5000); 341 342 // The write should be rejected because the table quota takes priority over the namespace 343 Put p = new Put(Bytes.toBytes("to_reject")); 344 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 345 Bytes.toBytes("reject")); 346 verifyViolation(policy, tn, p); 347 } 348 349 private Map<RegionInfo, Long> getReportedSizesForTable(TableName tn) { 350 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 351 MasterQuotaManager quotaManager = master.getMasterQuotaManager(); 352 Map<RegionInfo, Long> filteredRegionSizes = new HashMap<>(); 353 for (Entry<RegionInfo, Long> entry : quotaManager.snapshotRegionSizes().entrySet()) { 354 if (entry.getKey().getTable().equals(tn)) { 355 filteredRegionSizes.put(entry.getKey(), entry.getValue()); 356 } 357 } 358 return filteredRegionSizes; 359 } 360 361 private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception { 362 TableName tn = helper.createTableWithRegions(10); 363 364 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 365 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, policyToViolate); 366 TEST_UTIL.getAdmin().setQuota(settings); 367 368 // Write more data than should be allowed and flush it to disk 369 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 370 371 // This should be sufficient time for the chores to run and see the change. 372 Thread.sleep(5000); 373 374 return tn; 375 } 376 377 private TableName writeUntilViolationAndVerifyViolation(SpaceViolationPolicy policyToViolate, 378 Mutation m) throws Exception { 379 final TableName tn = writeUntilViolation(policyToViolate); 380 verifyViolation(policyToViolate, tn, m); 381 return tn; 382 } 383 384 private void verifyViolation(SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) 385 throws Exception { 386 // But let's try a few times to get the exception before failing 387 boolean sawError = false; 388 for (int i = 0; i < NUM_RETRIES && !sawError; i++) { 389 try (Table table = TEST_UTIL.getConnection().getTable(tn)) { 390 if (m instanceof Put) { 391 table.put((Put) m); 392 } else if (m instanceof Delete) { 393 table.delete((Delete) m); 394 } else if (m instanceof Append) { 395 table.append((Append) m); 396 } else if (m instanceof Increment) { 397 table.increment((Increment) m); 398 } else { 399 fail( 400 "Failed to apply " + m.getClass().getSimpleName() + " to the table. Programming error"); 401 } 402 LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry"); 403 Thread.sleep(2000); 404 } catch (Exception e) { 405 String msg = StringUtils.stringifyException(e); 406 assertTrue("Expected exception message to contain the word '" + policyToViolate.name() 407 + "', but was " + msg, msg.contains(policyToViolate.name())); 408 sawError = true; 409 } 410 } 411 if (!sawError) { 412 try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 413 ResultScanner scanner = quotaTable.getScanner(new Scan()); 414 Result result = null; 415 LOG.info("Dumping contents of hbase:quota table"); 416 while ((result = scanner.next()) != null) { 417 LOG.info(Bytes.toString(result.getRow()) + " => " + result.toString()); 418 } 419 scanner.close(); 420 } 421 } 422 assertTrue("Expected to see an exception writing data to a table exceeding its quota", 423 sawError); 424 } 425}