001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Append; 040import org.apache.hadoop.hbase.client.ClientServiceCallable; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Increment; 044import org.apache.hadoop.hbase.client.Mutation; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.RpcRetryingCaller; 050import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.master.HMaster; 054import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement; 055import org.apache.hadoop.hbase.regionserver.HRegionServer; 056import org.apache.hadoop.hbase.security.AccessDeniedException; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.util.StringUtils; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * End-to-end test class for filesystem space quotas. 073 */ 074@Category(LargeTests.class) 075public class TestSpaceQuotas { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestSpaceQuotas.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestSpaceQuotas.class); 082 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 // Global for all tests in the class 084 private static final AtomicLong COUNTER = new AtomicLong(0); 085 private static final int NUM_RETRIES = 10; 086 087 @Rule 088 public TestName testName = new TestName(); 089 private SpaceQuotaHelperForTests helper; 090 091 @BeforeClass 092 public static void setUp() throws Exception { 093 Configuration conf = TEST_UTIL.getConfiguration(); 094 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 095 TEST_UTIL.startMiniCluster(1); 096 } 097 098 @AfterClass 099 public static void tearDown() throws Exception { 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 @Before 104 public void removeAllQuotas() throws Exception { 105 final Connection conn = TEST_UTIL.getConnection(); 106 if (helper == null) { 107 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 108 } 109 // Wait for the quota table to be created 110 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 111 helper.waitForQuotaTable(conn); 112 } else { 113 // Or, clean up any quotas from previous test runs. 114 helper.removeAllQuotas(conn); 115 assertEquals(0, helper.listNumDefinedQuotas(conn)); 116 } 117 } 118 119 @Test 120 public void testNoInsertsWithPut() throws Exception { 121 Put p = new Put(Bytes.toBytes("to_reject")); 122 p.addColumn( 123 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 124 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p); 125 } 126 127 @Test 128 public void testNoInsertsWithAppend() throws Exception { 129 Append a = new Append(Bytes.toBytes("to_reject")); 130 a.addColumn( 131 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 132 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a); 133 } 134 135 @Test 136 public void testNoInsertsWithIncrement() throws Exception { 137 Increment i = new Increment(Bytes.toBytes("to_reject")); 138 i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); 139 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i); 140 } 141 142 @Test 143 public void testDeletesAfterNoInserts() throws Exception { 144 final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS); 145 // Try a couple of times to verify that the quota never gets enforced, same as we 146 // do when we're trying to catch the failure. 147 Delete d = new Delete(Bytes.toBytes("should_not_be_rejected")); 148 for (int i = 0; i < NUM_RETRIES; i++) { 149 try (Table t = TEST_UTIL.getConnection().getTable(tn)) { 150 t.delete(d); 151 } 152 } 153 } 154 155 @Test 156 public void testNoWritesWithPut() throws Exception { 157 Put p = new Put(Bytes.toBytes("to_reject")); 158 p.addColumn( 159 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 160 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); 161 } 162 163 @Test 164 public void testNoWritesWithAppend() throws Exception { 165 Append a = new Append(Bytes.toBytes("to_reject")); 166 a.addColumn( 167 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 168 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a); 169 } 170 171 @Test 172 public void testNoWritesWithIncrement() throws Exception { 173 Increment i = new Increment(Bytes.toBytes("to_reject")); 174 i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); 175 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i); 176 } 177 178 @Test 179 public void testNoWritesWithDelete() throws Exception { 180 Delete d = new Delete(Bytes.toBytes("to_reject")); 181 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d); 182 } 183 184 @Test 185 public void testNoCompactions() throws Exception { 186 Put p = new Put(Bytes.toBytes("to_reject")); 187 p.addColumn( 188 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 189 final TableName tn = writeUntilViolationAndVerifyViolation( 190 SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p); 191 // We know the policy is active at this point 192 193 // Major compactions should be rejected 194 try { 195 TEST_UTIL.getAdmin().majorCompact(tn); 196 fail("Expected that invoking the compaction should throw an Exception"); 197 } catch (DoNotRetryIOException e) { 198 // Expected! 199 } 200 // Minor compactions should also be rejected. 201 try { 202 TEST_UTIL.getAdmin().compact(tn); 203 fail("Expected that invoking the compaction should throw an Exception"); 204 } catch (DoNotRetryIOException e) { 205 // Expected! 206 } 207 } 208 209 @Test 210 public void testNoEnableAfterDisablePolicy() throws Exception { 211 Put p = new Put(Bytes.toBytes("to_reject")); 212 p.addColumn( 213 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 214 final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE); 215 final Admin admin = TEST_UTIL.getAdmin(); 216 // Disabling a table relies on some external action (over the other policies), so wait a bit 217 // more than the other tests. 218 for (int i = 0; i < NUM_RETRIES * 2; i++) { 219 if (admin.isTableEnabled(tn)) { 220 LOG.info(tn + " is still enabled, expecting it to be disabled. Will wait and re-check."); 221 Thread.sleep(2000); 222 } 223 } 224 assertFalse(tn + " is still enabled but it should be disabled", admin.isTableEnabled(tn)); 225 try { 226 admin.enableTable(tn); 227 } catch (AccessDeniedException e) { 228 String exceptionContents = StringUtils.stringifyException(e); 229 final String expectedText = "violated space quota"; 230 assertTrue("Expected the exception to contain " + expectedText + ", but was: " 231 + exceptionContents, exceptionContents.contains(expectedText)); 232 } 233 } 234 235 @Test 236 public void testNoBulkLoadsWithNoWrites() throws Exception { 237 Put p = new Put(Bytes.toBytes("to_reject")); 238 p.addColumn( 239 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 240 TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); 241 242 // The table is now in violation. Try to do a bulk load 243 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50); 244 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); 245 RpcRetryingCaller<Void> caller = factory.newCaller(); 246 try { 247 caller.callWithRetries(callable, Integer.MAX_VALUE); 248 fail("Expected the bulk load call to fail!"); 249 } catch (SpaceLimitingException e) { 250 // Pass 251 LOG.trace("Caught expected exception", e); 252 } 253 } 254 255 @Test 256 public void testAtomicBulkLoadUnderQuota() throws Exception { 257 // Need to verify that if the batch of hfiles cannot be loaded, none are loaded. 258 TableName tn = helper.createTableWithRegions(10); 259 260 final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE; 261 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( 262 tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS); 263 TEST_UTIL.getAdmin().setQuota(settings); 264 265 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 266 RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager(); 267 Map<TableName,SpaceQuotaSnapshot> snapshots = spaceQuotaManager.copyQuotaSnapshots(); 268 Map<RegionInfo,Long> regionSizes = getReportedSizesForTable(tn); 269 while (true) { 270 SpaceQuotaSnapshot snapshot = snapshots.get(tn); 271 if (snapshot != null && snapshot.getLimit() > 0) { 272 break; 273 } 274 LOG.debug( 275 "Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: " + 276 regionSizes); 277 Thread.sleep(3000); 278 snapshots = spaceQuotaManager.copyQuotaSnapshots(); 279 regionSizes = getReportedSizesForTable(tn); 280 } 281 // Our quota limit should be reflected in the latest snapshot 282 SpaceQuotaSnapshot snapshot = snapshots.get(tn); 283 assertEquals(0L, snapshot.getUsage()); 284 assertEquals(sizeLimit, snapshot.getLimit()); 285 286 // We would also not have a "real" policy in violation 287 ActivePolicyEnforcement activePolicies = spaceQuotaManager.getActiveEnforcements(); 288 SpaceViolationPolicyEnforcement enforcement = activePolicies.getPolicyEnforcement(tn); 289 assertTrue( 290 "Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(), 291 enforcement instanceof DefaultViolationPolicyEnforcement); 292 293 // Should generate two files, each of which is over 25KB each 294 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 2, 525); 295 FileSystem fs = TEST_UTIL.getTestFileSystem(); 296 FileStatus[] files = fs.listStatus( 297 new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); 298 for (FileStatus file : files) { 299 assertTrue( 300 "Expected the file, " + file.getPath() + ", length to be larger than 25KB, but was " 301 + file.getLen(), 302 file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); 303 LOG.debug(file.getPath() + " -> " + file.getLen() +"B"); 304 } 305 306 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); 307 RpcRetryingCaller<Void> caller = factory.newCaller(); 308 try { 309 caller.callWithRetries(callable, Integer.MAX_VALUE); 310 fail("Expected the bulk load call to fail!"); 311 } catch (SpaceLimitingException e) { 312 // Pass 313 LOG.trace("Caught expected exception", e); 314 } 315 // Verify that we have no data in the table because neither file should have been 316 // loaded even though one of the files could have. 317 Table table = TEST_UTIL.getConnection().getTable(tn); 318 ResultScanner scanner = table.getScanner(new Scan()); 319 try { 320 assertNull("Expected no results", scanner.next()); 321 } finally{ 322 scanner.close(); 323 } 324 } 325 326 @Test 327 public void testTableQuotaOverridesNamespaceQuota() throws Exception { 328 final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS; 329 final TableName tn = helper.createTableWithRegions(10); 330 331 // 2MB limit on the table, 1GB limit on the namespace 332 final long tableLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 333 final long namespaceLimit = 1024L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 334 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy)); 335 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitNamespaceSpace( 336 tn.getNamespaceAsString(), namespaceLimit, policy)); 337 338 // Write more data than should be allowed and flush it to disk 339 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 340 341 // This should be sufficient time for the chores to run and see the change. 342 Thread.sleep(5000); 343 344 // The write should be rejected because the table quota takes priority over the namespace 345 Put p = new Put(Bytes.toBytes("to_reject")); 346 p.addColumn( 347 Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); 348 verifyViolation(policy, tn, p); 349 } 350 351 private Map<RegionInfo,Long> getReportedSizesForTable(TableName tn) { 352 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 353 MasterQuotaManager quotaManager = master.getMasterQuotaManager(); 354 Map<RegionInfo,Long> filteredRegionSizes = new HashMap<>(); 355 for (Entry<RegionInfo,Long> entry : quotaManager.snapshotRegionSizes().entrySet()) { 356 if (entry.getKey().getTable().equals(tn)) { 357 filteredRegionSizes.put(entry.getKey(), entry.getValue()); 358 } 359 } 360 return filteredRegionSizes; 361 } 362 363 private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception { 364 TableName tn = helper.createTableWithRegions(10); 365 366 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 367 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, policyToViolate); 368 TEST_UTIL.getAdmin().setQuota(settings); 369 370 // Write more data than should be allowed and flush it to disk 371 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 372 373 // This should be sufficient time for the chores to run and see the change. 374 Thread.sleep(5000); 375 376 return tn; 377 } 378 379 private TableName writeUntilViolationAndVerifyViolation( 380 SpaceViolationPolicy policyToViolate, Mutation m) throws Exception { 381 final TableName tn = writeUntilViolation(policyToViolate); 382 verifyViolation(policyToViolate, tn, m); 383 return tn; 384 } 385 386 private void verifyViolation( 387 SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) throws Exception { 388 // But let's try a few times to get the exception before failing 389 boolean sawError = false; 390 for (int i = 0; i < NUM_RETRIES && !sawError; i++) { 391 try (Table table = TEST_UTIL.getConnection().getTable(tn)) { 392 if (m instanceof Put) { 393 table.put((Put) m); 394 } else if (m instanceof Delete) { 395 table.delete((Delete) m); 396 } else if (m instanceof Append) { 397 table.append((Append) m); 398 } else if (m instanceof Increment) { 399 table.increment((Increment) m); 400 } else { 401 fail( 402 "Failed to apply " + m.getClass().getSimpleName() + 403 " to the table. Programming error"); 404 } 405 LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry"); 406 Thread.sleep(2000); 407 } catch (Exception e) { 408 String msg = StringUtils.stringifyException(e); 409 assertTrue("Expected exception message to contain the word '" + policyToViolate.name() + 410 "', but was " + msg, msg.contains(policyToViolate.name())); 411 sawError = true; 412 } 413 } 414 if (!sawError) { 415 try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 416 ResultScanner scanner = quotaTable.getScanner(new Scan()); 417 Result result = null; 418 LOG.info("Dumping contents of hbase:quota table"); 419 while ((result = scanner.next()) != null) { 420 LOG.info(Bytes.toString(result.getRow()) + " => " + result.toString()); 421 } 422 scanner.close(); 423 } 424 } 425 assertTrue( 426 "Expected to see an exception writing data to a table exceeding its quota", sawError); 427 } 428}