001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.util.Collections; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicLong; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.SnapshotType; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.Region; 044import org.apache.hadoop.hbase.regionserver.Store; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.junit.jupiter.api.TestInfo; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 055 056@Tag(MediumTests.TAG) 057public class TestLowLatencySpaceQuotas { 058 059 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 060 // Global for all tests in the class 061 private static final AtomicLong COUNTER = new AtomicLong(0); 062 063 private SpaceQuotaHelperForTests helper; 064 private Connection conn; 065 private Admin admin; 066 067 @BeforeAll 068 public static void setup() throws Exception { 069 Configuration conf = TEST_UTIL.getConfiguration(); 070 // The default 1s period for QuotaObserverChore is good. 071 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 072 // Set the period/delay to read region size from HDFS to be very long 073 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120); 074 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120); 075 // Set the same long period/delay to compute snapshot sizes 076 conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120); 077 conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120); 078 // Clean up the compacted files faster than normal (5s instead of 2mins) 079 conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000); 080 081 TEST_UTIL.startMiniCluster(1); 082 } 083 084 @AfterAll 085 public static void tearDown() throws Exception { 086 TEST_UTIL.shutdownMiniCluster(); 087 } 088 089 @BeforeEach 090 public void removeAllQuotas(TestInfo testInfo) throws Exception { 091 helper = new SpaceQuotaHelperForTests(TEST_UTIL, () -> testInfo.getTestMethod().get().getName(), 092 COUNTER); 093 conn = TEST_UTIL.getConnection(); 094 admin = TEST_UTIL.getAdmin(); 095 helper.waitForQuotaTable(conn); 096 } 097 098 @Test 099 public void testFlushes() throws Exception { 100 TableName tn = helper.createTableWithRegions(1); 101 // Set a quota 102 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 103 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 104 admin.setQuota(settings); 105 106 // Write some data 107 final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 108 helper.writeData(tn, initialSize); 109 110 // Make sure a flush happened 111 admin.flush(tn); 112 113 // We should be able to observe the system recording an increase in size (even 114 // though we know the filesystem scanning did not happen). 115 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 116 @Override 117 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 118 return snapshot.getUsage() >= initialSize; 119 } 120 }); 121 } 122 123 @Test 124 public void testMajorCompaction() throws Exception { 125 TableName tn = helper.createTableWithRegions(1); 126 // Set a quota 127 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 128 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 129 admin.setQuota(settings); 130 131 // Write some data and flush it to disk. 132 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 133 helper.writeData(tn, sizePerBatch); 134 admin.flush(tn); 135 136 // Write the same data again, flushing it to a second file 137 helper.writeData(tn, sizePerBatch); 138 admin.flush(tn); 139 140 // After two flushes, both hfiles would contain similar data. We should see 2x the data. 141 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 142 @Override 143 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 144 return snapshot.getUsage() >= 2L * sizePerBatch; 145 } 146 }); 147 148 // Rewrite the two files into one. 149 admin.majorCompact(tn); 150 151 // After we major compact the table, we should notice quickly that the amount of data in the 152 // table is much closer to reality (the duplicate entries across the two files are removed). 153 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 154 @Override 155 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 156 return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch; 157 } 158 }); 159 } 160 161 @Test 162 public void testMinorCompaction() throws Exception { 163 TableName tn = helper.createTableWithRegions(1); 164 // Set a quota 165 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 166 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 167 admin.setQuota(settings); 168 169 // Write some data and flush it to disk. 170 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 171 final long numBatches = 6; 172 for (long i = 0; i < numBatches; i++) { 173 helper.writeData(tn, sizePerBatch); 174 admin.flush(tn); 175 } 176 177 HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn)); 178 long numFiles = getNumHFilesForRegion(region); 179 assertEquals(numBatches, numFiles); 180 181 // After two flushes, both hfiles would contain similar data. We should see 2x the data. 182 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 183 @Override 184 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 185 return snapshot.getUsage() >= numFiles * sizePerBatch; 186 } 187 }); 188 189 // Rewrite some files into fewer 190 TEST_UTIL.compact(tn, false); 191 long numFilesAfterMinorCompaction = getNumHFilesForRegion(region); 192 193 // After we major compact the table, we should notice quickly that the amount of data in the 194 // table is much closer to reality (the duplicate entries across the two files are removed). 195 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 196 @Override 197 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 198 return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch 199 && snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch; 200 } 201 }); 202 } 203 204 private long getNumHFilesForRegion(HRegion region) { 205 return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum(); 206 } 207 208 @Test 209 public void testBulkLoading(TestInfo testInfo) throws Exception { 210 TableName tn = helper.createTableWithRegions(1); 211 // Set a quota 212 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 213 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 214 admin.setQuota(settings); 215 admin.compactionSwitch(false, 216 admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList())); 217 Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tn, 3, 550); 218 // Make sure the files are about as long as we expect 219 FileSystem fs = TEST_UTIL.getTestFileSystem(); 220 FileStatus[] files = fs.listStatus( 221 new Path(fs.getHomeDirectory(), testInfo.getTestMethod().get().getName() + "_files")); 222 long totalSize = 0; 223 for (FileStatus file : files) { 224 assertTrue(file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE, "Expected the file, " 225 + file.getPath() + ", length to be larger than 25KB, but was " + file.getLen()); 226 totalSize += file.getLen(); 227 } 228 229 assertFalse( 230 BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tn, family2Files).isEmpty(), 231 "The bulk load failed"); 232 233 final long finalTotalSize = totalSize; 234 try { 235 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 236 @Override 237 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 238 return snapshot.getUsage() >= finalTotalSize; 239 } 240 }); 241 } finally { 242 admin.compactionSwitch(true, 243 admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList())); 244 } 245 } 246 247 @Test 248 public void testSnapshotSizes() throws Exception { 249 TableName tn = helper.createTableWithRegions(1); 250 // Set a quota 251 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 252 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 253 admin.setQuota(settings); 254 255 // Write some data and flush it to disk. 256 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 257 helper.writeData(tn, sizePerBatch); 258 admin.flush(tn); 259 260 final String snapshot1 = "snapshot1"; 261 admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH); 262 263 // Compute the size of the file for the Region we'll send to archive 264 Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn)); 265 List<? extends Store> stores = region.getStores(); 266 long summer = 0; 267 for (Store store : stores) { 268 summer += store.getStorefilesSize(); 269 } 270 final long storeFileSize = summer; 271 272 // Wait for the table to show the usage 273 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 274 @Override 275 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 276 return snapshot.getUsage() == storeFileSize; 277 } 278 }); 279 280 // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want 281 // to test in the absence of this chore. 282 FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster() 283 .getSnapshotQuotaObserverChore().getNotifierForTable(tn); 284 notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1)); 285 286 // Force a major compaction to create a new file and push the old file to the archive 287 TEST_UTIL.compact(tn, true); 288 289 // After moving the old file to archive/, the space of this table should double 290 // We have a new file created by the majc referenced by the table and the snapshot still 291 // referencing the old file. 292 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 293 @Override 294 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 295 return snapshot.getUsage() >= 2 * storeFileSize; 296 } 297 }); 298 299 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 300 Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1)); 301 assertTrue(r != null && !r.isEmpty(), "Expected a non-null, non-empty Result"); 302 assertTrue(r.advance()); 303 assertEquals(storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()), 304 "The snapshot's size should be the same as the origin store file"); 305 306 r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString())); 307 assertTrue(r != null && !r.isEmpty(), "Expected a non-null, non-empty Result"); 308 assertTrue(r.advance()); 309 assertEquals(storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()), 310 "The snapshot's size should be the same as the origin store file"); 311 } 312 } 313}