001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.util.Collections;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.SnapshotType;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.Region;
044import org.apache.hadoop.hbase.regionserver.Store;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.junit.jupiter.api.TestInfo;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
055
056@Tag(MediumTests.TAG)
057public class TestLowLatencySpaceQuotas {
058
059  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060  // Global for all tests in the class
061  private static final AtomicLong COUNTER = new AtomicLong(0);
062
063  private SpaceQuotaHelperForTests helper;
064  private Connection conn;
065  private Admin admin;
066
067  @BeforeAll
068  public static void setup() throws Exception {
069    Configuration conf = TEST_UTIL.getConfiguration();
070    // The default 1s period for QuotaObserverChore is good.
071    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
072    // Set the period/delay to read region size from HDFS to be very long
073    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
074    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120);
075    // Set the same long period/delay to compute snapshot sizes
076    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120);
077    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120);
078    // Clean up the compacted files faster than normal (5s instead of 2mins)
079    conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
080
081    TEST_UTIL.startMiniCluster(1);
082  }
083
084  @AfterAll
085  public static void tearDown() throws Exception {
086    TEST_UTIL.shutdownMiniCluster();
087  }
088
089  @BeforeEach
090  public void removeAllQuotas(TestInfo testInfo) throws Exception {
091    helper = new SpaceQuotaHelperForTests(TEST_UTIL, () -> testInfo.getTestMethod().get().getName(),
092      COUNTER);
093    conn = TEST_UTIL.getConnection();
094    admin = TEST_UTIL.getAdmin();
095    helper.waitForQuotaTable(conn);
096  }
097
098  @Test
099  public void testFlushes() throws Exception {
100    TableName tn = helper.createTableWithRegions(1);
101    // Set a quota
102    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
103      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
104    admin.setQuota(settings);
105
106    // Write some data
107    final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
108    helper.writeData(tn, initialSize);
109
110    // Make sure a flush happened
111    admin.flush(tn);
112
113    // We should be able to observe the system recording an increase in size (even
114    // though we know the filesystem scanning did not happen).
115    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
116      @Override
117      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
118        return snapshot.getUsage() >= initialSize;
119      }
120    });
121  }
122
123  @Test
124  public void testMajorCompaction() throws Exception {
125    TableName tn = helper.createTableWithRegions(1);
126    // Set a quota
127    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
128      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
129    admin.setQuota(settings);
130
131    // Write some data and flush it to disk.
132    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
133    helper.writeData(tn, sizePerBatch);
134    admin.flush(tn);
135
136    // Write the same data again, flushing it to a second file
137    helper.writeData(tn, sizePerBatch);
138    admin.flush(tn);
139
140    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
141    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
142      @Override
143      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
144        return snapshot.getUsage() >= 2L * sizePerBatch;
145      }
146    });
147
148    // Rewrite the two files into one.
149    admin.majorCompact(tn);
150
151    // After we major compact the table, we should notice quickly that the amount of data in the
152    // table is much closer to reality (the duplicate entries across the two files are removed).
153    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
154      @Override
155      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
156        return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
157      }
158    });
159  }
160
161  @Test
162  public void testMinorCompaction() throws Exception {
163    TableName tn = helper.createTableWithRegions(1);
164    // Set a quota
165    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
166      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
167    admin.setQuota(settings);
168
169    // Write some data and flush it to disk.
170    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
171    final long numBatches = 6;
172    for (long i = 0; i < numBatches; i++) {
173      helper.writeData(tn, sizePerBatch);
174      admin.flush(tn);
175    }
176
177    HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
178    long numFiles = getNumHFilesForRegion(region);
179    assertEquals(numBatches, numFiles);
180
181    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
182    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
183      @Override
184      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
185        return snapshot.getUsage() >= numFiles * sizePerBatch;
186      }
187    });
188
189    // Rewrite some files into fewer
190    TEST_UTIL.compact(tn, false);
191    long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
192
193    // After we major compact the table, we should notice quickly that the amount of data in the
194    // table is much closer to reality (the duplicate entries across the two files are removed).
195    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
196      @Override
197      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
198        return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch
199          && snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
200      }
201    });
202  }
203
204  private long getNumHFilesForRegion(HRegion region) {
205    return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
206  }
207
208  @Test
209  public void testBulkLoading(TestInfo testInfo) throws Exception {
210    TableName tn = helper.createTableWithRegions(1);
211    // Set a quota
212    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
213      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
214    admin.setQuota(settings);
215    admin.compactionSwitch(false,
216      admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
217    Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tn, 3, 550);
218    // Make sure the files are about as long as we expect
219    FileSystem fs = TEST_UTIL.getTestFileSystem();
220    FileStatus[] files = fs.listStatus(
221      new Path(fs.getHomeDirectory(), testInfo.getTestMethod().get().getName() + "_files"));
222    long totalSize = 0;
223    for (FileStatus file : files) {
224      assertTrue(file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE, "Expected the file, "
225        + file.getPath() + ",  length to be larger than 25KB, but was " + file.getLen());
226      totalSize += file.getLen();
227    }
228
229    assertFalse(
230      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tn, family2Files).isEmpty(),
231      "The bulk load failed");
232
233    final long finalTotalSize = totalSize;
234    try {
235      TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
236        @Override
237        boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
238          return snapshot.getUsage() >= finalTotalSize;
239        }
240      });
241    } finally {
242      admin.compactionSwitch(true,
243        admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
244    }
245  }
246
247  @Test
248  public void testSnapshotSizes() throws Exception {
249    TableName tn = helper.createTableWithRegions(1);
250    // Set a quota
251    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
252      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
253    admin.setQuota(settings);
254
255    // Write some data and flush it to disk.
256    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
257    helper.writeData(tn, sizePerBatch);
258    admin.flush(tn);
259
260    final String snapshot1 = "snapshot1";
261    admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH);
262
263    // Compute the size of the file for the Region we'll send to archive
264    Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
265    List<? extends Store> stores = region.getStores();
266    long summer = 0;
267    for (Store store : stores) {
268      summer += store.getStorefilesSize();
269    }
270    final long storeFileSize = summer;
271
272    // Wait for the table to show the usage
273    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
274      @Override
275      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
276        return snapshot.getUsage() == storeFileSize;
277      }
278    });
279
280    // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want
281    // to test in the absence of this chore.
282    FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster()
283      .getSnapshotQuotaObserverChore().getNotifierForTable(tn);
284    notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1));
285
286    // Force a major compaction to create a new file and push the old file to the archive
287    TEST_UTIL.compact(tn, true);
288
289    // After moving the old file to archive/, the space of this table should double
290    // We have a new file created by the majc referenced by the table and the snapshot still
291    // referencing the old file.
292    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
293      @Override
294      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
295        return snapshot.getUsage() >= 2 * storeFileSize;
296      }
297    });
298
299    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
300      Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1));
301      assertTrue(r != null && !r.isEmpty(), "Expected a non-null, non-empty Result");
302      assertTrue(r.advance());
303      assertEquals(storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()),
304        "The snapshot's size should be the same as the origin store file");
305
306      r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString()));
307      assertTrue(r != null && !r.isEmpty(), "Expected a non-null, non-empty Result");
308      assertTrue(r.advance());
309      assertEquals(storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()),
310        "The snapshot's size should be the same as the origin store file");
311    }
312  }
313}