001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.Collections;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.SnapshotType;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.Region;
045import org.apache.hadoop.hbase.regionserver.Store;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058
059@Category({ MediumTests.class })
060public class TestLowLatencySpaceQuotas {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class);
065
066  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
067  // Global for all tests in the class
068  private static final AtomicLong COUNTER = new AtomicLong(0);
069
070  @Rule
071  public TestName testName = new TestName();
072  private SpaceQuotaHelperForTests helper;
073  private Connection conn;
074  private Admin admin;
075
076  @BeforeClass
077  public static void setup() throws Exception {
078    Configuration conf = TEST_UTIL.getConfiguration();
079    // The default 1s period for QuotaObserverChore is good.
080    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
081    // Set the period/delay to read region size from HDFS to be very long
082    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
083    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120);
084    // Set the same long period/delay to compute snapshot sizes
085    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120);
086    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120);
087    // Clean up the compacted files faster than normal (5s instead of 2mins)
088    conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
089
090    TEST_UTIL.startMiniCluster(1);
091  }
092
093  @AfterClass
094  public static void tearDown() throws Exception {
095    TEST_UTIL.shutdownMiniCluster();
096  }
097
098  @Before
099  public void removeAllQuotas() throws Exception {
100    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
101    conn = TEST_UTIL.getConnection();
102    admin = TEST_UTIL.getAdmin();
103    helper.waitForQuotaTable(conn);
104  }
105
106  @Test
107  public void testFlushes() throws Exception {
108    TableName tn = helper.createTableWithRegions(1);
109    // Set a quota
110    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
111      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
112    admin.setQuota(settings);
113
114    // Write some data
115    final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
116    helper.writeData(tn, initialSize);
117
118    // Make sure a flush happened
119    admin.flush(tn);
120
121    // We should be able to observe the system recording an increase in size (even
122    // though we know the filesystem scanning did not happen).
123    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
124      @Override
125      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
126        return snapshot.getUsage() >= initialSize;
127      }
128    });
129  }
130
131  @Test
132  public void testMajorCompaction() throws Exception {
133    TableName tn = helper.createTableWithRegions(1);
134    // Set a quota
135    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
136      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
137    admin.setQuota(settings);
138
139    // Write some data and flush it to disk.
140    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
141    helper.writeData(tn, sizePerBatch);
142    admin.flush(tn);
143
144    // Write the same data again, flushing it to a second file
145    helper.writeData(tn, sizePerBatch);
146    admin.flush(tn);
147
148    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
149    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
150      @Override
151      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
152        return snapshot.getUsage() >= 2L * sizePerBatch;
153      }
154    });
155
156    // Rewrite the two files into one.
157    admin.majorCompact(tn);
158
159    // After we major compact the table, we should notice quickly that the amount of data in the
160    // table is much closer to reality (the duplicate entries across the two files are removed).
161    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
162      @Override
163      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
164        return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
165      }
166    });
167  }
168
169  @Test
170  public void testMinorCompaction() throws Exception {
171    TableName tn = helper.createTableWithRegions(1);
172    // Set a quota
173    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
174      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
175    admin.setQuota(settings);
176
177    // Write some data and flush it to disk.
178    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
179    final long numBatches = 6;
180    for (long i = 0; i < numBatches; i++) {
181      helper.writeData(tn, sizePerBatch);
182      admin.flush(tn);
183    }
184
185    HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
186    long numFiles = getNumHFilesForRegion(region);
187    assertEquals(numBatches, numFiles);
188
189    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
190    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
191      @Override
192      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
193        return snapshot.getUsage() >= numFiles * sizePerBatch;
194      }
195    });
196
197    // Rewrite some files into fewer
198    TEST_UTIL.compact(tn, false);
199    long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
200
201    // After we major compact the table, we should notice quickly that the amount of data in the
202    // table is much closer to reality (the duplicate entries across the two files are removed).
203    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
204      @Override
205      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
206        return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch
207          && snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
208      }
209    });
210  }
211
212  private long getNumHFilesForRegion(HRegion region) {
213    return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
214  }
215
216  @Test
217  public void testBulkLoading() throws Exception {
218    TableName tn = helper.createTableWithRegions(1);
219    // Set a quota
220    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
221      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
222    admin.setQuota(settings);
223    admin.compactionSwitch(false,
224      admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
225    Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tn, 3, 550);
226    // Make sure the files are about as long as we expect
227    FileSystem fs = TEST_UTIL.getTestFileSystem();
228    FileStatus[] files =
229      fs.listStatus(new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
230    long totalSize = 0;
231    for (FileStatus file : files) {
232      assertTrue("Expected the file, " + file.getPath()
233        + ",  length to be larger than 25KB, but was " + file.getLen(),
234        file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
235      totalSize += file.getLen();
236    }
237
238    assertFalse("The bulk load failed",
239      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tn, family2Files).isEmpty());
240
241    final long finalTotalSize = totalSize;
242    try {
243      TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
244        @Override
245        boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
246          return snapshot.getUsage() >= finalTotalSize;
247        }
248      });
249    } finally {
250      admin.compactionSwitch(true,
251        admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
252    }
253  }
254
255  @Test
256  public void testSnapshotSizes() throws Exception {
257    TableName tn = helper.createTableWithRegions(1);
258    // Set a quota
259    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
260      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
261    admin.setQuota(settings);
262
263    // Write some data and flush it to disk.
264    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
265    helper.writeData(tn, sizePerBatch);
266    admin.flush(tn);
267
268    final String snapshot1 = "snapshot1";
269    admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH);
270
271    // Compute the size of the file for the Region we'll send to archive
272    Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
273    List<? extends Store> stores = region.getStores();
274    long summer = 0;
275    for (Store store : stores) {
276      summer += store.getStorefilesSize();
277    }
278    final long storeFileSize = summer;
279
280    // Wait for the table to show the usage
281    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
282      @Override
283      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
284        return snapshot.getUsage() == storeFileSize;
285      }
286    });
287
288    // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want
289    // to test in the absence of this chore.
290    FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster()
291      .getSnapshotQuotaObserverChore().getNotifierForTable(tn);
292    notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1));
293
294    // Force a major compaction to create a new file and push the old file to the archive
295    TEST_UTIL.compact(tn, true);
296
297    // After moving the old file to archive/, the space of this table should double
298    // We have a new file created by the majc referenced by the table and the snapshot still
299    // referencing the old file.
300    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
301      @Override
302      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
303        return snapshot.getUsage() >= 2 * storeFileSize;
304      }
305    });
306
307    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
308      Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1));
309      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
310      assertTrue(r.advance());
311      assertEquals("The snapshot's size should be the same as the origin store file", storeFileSize,
312        QuotaTableUtil.parseSnapshotSize(r.current()));
313
314      r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString()));
315      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
316      assertTrue(r.advance());
317      assertEquals("The snapshot's size should be the same as the origin store file", storeFileSize,
318        QuotaTableUtil.parseSnapshotSize(r.current()));
319    }
320  }
321}