001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import static org.junit.Assert.assertEquals; 020import static org.junit.Assert.assertTrue; 021 022import java.util.Collections; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicLong; 025import java.util.stream.Collectors; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ClientServiceCallable; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.RpcRetryingCaller; 040import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 041import org.apache.hadoop.hbase.client.SnapshotType; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.regionserver.Region; 046import org.apache.hadoop.hbase.regionserver.Store; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 058 059@Category({MediumTests.class}) 060public class TestLowLatencySpaceQuotas { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class); 065 066 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 067 // Global for all tests in the class 068 private static final AtomicLong COUNTER = new AtomicLong(0); 069 070 @Rule 071 public TestName testName = new TestName(); 072 private SpaceQuotaHelperForTests helper; 073 private Connection conn; 074 private Admin admin; 075 076 @BeforeClass 077 public static void setup() throws Exception { 078 Configuration conf = TEST_UTIL.getConfiguration(); 079 // The default 1s period for QuotaObserverChore is good. 080 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 081 // Set the period/delay to read region size from HDFS to be very long 082 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120); 083 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120); 084 // Set the same long period/delay to compute snapshot sizes 085 conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120); 086 conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120); 087 // Clean up the compacted files faster than normal (5s instead of 2mins) 088 conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000); 089 090 TEST_UTIL.startMiniCluster(1); 091 } 092 093 @AfterClass 094 public static void tearDown() throws Exception { 095 TEST_UTIL.shutdownMiniCluster(); 096 } 097 098 @Before 099 public void removeAllQuotas() throws Exception { 100 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 101 conn = TEST_UTIL.getConnection(); 102 admin = TEST_UTIL.getAdmin(); 103 helper.waitForQuotaTable(conn); 104 } 105 106 @Test 107 public void testFlushes() throws Exception { 108 TableName tn = helper.createTableWithRegions(1); 109 // Set a quota 110 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( 111 tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 112 admin.setQuota(settings); 113 114 // Write some data 115 final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 116 helper.writeData(tn, initialSize); 117 118 // Make sure a flush happened 119 admin.flush(tn); 120 121 // We should be able to observe the system recording an increase in size (even 122 // though we know the filesystem scanning did not happen). 123 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 124 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 125 return snapshot.getUsage() >= initialSize; 126 } 127 }); 128 } 129 130 @Test 131 public void testMajorCompaction() throws Exception { 132 TableName tn = helper.createTableWithRegions(1); 133 // Set a quota 134 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( 135 tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 136 admin.setQuota(settings); 137 138 // Write some data and flush it to disk. 139 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 140 helper.writeData(tn, sizePerBatch); 141 admin.flush(tn); 142 143 // Write the same data again, flushing it to a second file 144 helper.writeData(tn, sizePerBatch); 145 admin.flush(tn); 146 147 // After two flushes, both hfiles would contain similar data. We should see 2x the data. 148 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 149 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 150 return snapshot.getUsage() >= 2L * sizePerBatch; 151 } 152 }); 153 154 // Rewrite the two files into one. 155 admin.majorCompact(tn); 156 157 // After we major compact the table, we should notice quickly that the amount of data in the 158 // table is much closer to reality (the duplicate entries across the two files are removed). 159 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 160 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 161 return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch; 162 } 163 }); 164 } 165 166 @Test 167 public void testMinorCompaction() throws Exception { 168 TableName tn = helper.createTableWithRegions(1); 169 // Set a quota 170 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( 171 tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 172 admin.setQuota(settings); 173 174 // Write some data and flush it to disk. 175 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 176 final long numBatches = 6; 177 for (long i = 0; i < numBatches; i++) { 178 helper.writeData(tn, sizePerBatch); 179 admin.flush(tn); 180 } 181 182 HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn)); 183 long numFiles = getNumHFilesForRegion(region); 184 assertEquals(numBatches, numFiles); 185 186 // After two flushes, both hfiles would contain similar data. We should see 2x the data. 187 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 188 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 189 return snapshot.getUsage() >= numFiles * sizePerBatch; 190 } 191 }); 192 193 // Rewrite some files into fewer 194 TEST_UTIL.compact(tn, false); 195 long numFilesAfterMinorCompaction = getNumHFilesForRegion(region); 196 197 // After we major compact the table, we should notice quickly that the amount of data in the 198 // table is much closer to reality (the duplicate entries across the two files are removed). 199 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 200 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 201 return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch && 202 snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch; 203 } 204 }); 205 } 206 207 private long getNumHFilesForRegion(HRegion region) { 208 return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum(); 209 } 210 211 @Test 212 public void testBulkLoading() throws Exception { 213 TableName tn = helper.createTableWithRegions(1); 214 // Set a quota 215 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( 216 tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 217 admin.setQuota(settings); 218 admin.compactionSwitch(false, 219 admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList())); 220 221 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 3, 550); 222 // Make sure the files are about as long as we expect 223 FileSystem fs = TEST_UTIL.getTestFileSystem(); 224 FileStatus[] files = fs.listStatus( 225 new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); 226 long totalSize = 0; 227 for (FileStatus file : files) { 228 assertTrue( 229 "Expected the file, " + file.getPath() + ", length to be larger than 25KB, but was " 230 + file.getLen(), 231 file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); 232 totalSize += file.getLen(); 233 } 234 235 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); 236 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 237 caller.callWithRetries(callable, Integer.MAX_VALUE); 238 239 final long finalTotalSize = totalSize; 240 try { 241 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 242 @Override 243 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 244 return snapshot.getUsage() >= finalTotalSize; 245 } 246 }); 247 } finally { 248 admin.compactionSwitch(true, 249 admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList())); 250 } 251 } 252 253 @Test 254 public void testSnapshotSizes() throws Exception { 255 TableName tn = helper.createTableWithRegions(1); 256 // Set a quota 257 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( 258 tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 259 admin.setQuota(settings); 260 261 // Write some data and flush it to disk. 262 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 263 helper.writeData(tn, sizePerBatch); 264 admin.flush(tn); 265 266 final String snapshot1 = "snapshot1"; 267 admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH); 268 269 // Compute the size of the file for the Region we'll send to archive 270 Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn)); 271 List<? extends Store> stores = region.getStores(); 272 long summer = 0; 273 for (Store store : stores) { 274 summer += store.getStorefilesSize(); 275 } 276 final long storeFileSize = summer; 277 278 // Wait for the table to show the usage 279 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 280 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 281 return snapshot.getUsage() == storeFileSize; 282 } 283 }); 284 285 // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want 286 // to test in the absence of this chore. 287 FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster() 288 .getSnapshotQuotaObserverChore().getNotifierForTable(tn); 289 notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1)); 290 291 // Force a major compaction to create a new file and push the old file to the archive 292 TEST_UTIL.compact(tn, true); 293 294 // After moving the old file to archive/, the space of this table should double 295 // We have a new file created by the majc referenced by the table and the snapshot still 296 // referencing the old file. 297 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 298 @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 299 return snapshot.getUsage() >= 2 * storeFileSize; 300 } 301 }); 302 303 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 304 Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1)); 305 assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty()); 306 assertTrue(r.advance()); 307 assertEquals("The snapshot's size should be the same as the origin store file", 308 storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current())); 309 310 r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString())); 311 assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty()); 312 assertTrue(r.advance()); 313 assertEquals("The snapshot's size should be the same as the origin store file", 314 storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current())); 315 } 316 } 317}