001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import static org.junit.Assert.assertEquals;
020import static org.junit.Assert.assertTrue;
021
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicLong;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ClientServiceCallable;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.RpcRetryingCaller;
038import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
039import org.apache.hadoop.hbase.client.SnapshotType;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.Region;
044import org.apache.hadoop.hbase.regionserver.Store;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
056
057@Category({MediumTests.class})
058public class TestLowLatencySpaceQuotas {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class);
063
064  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
065  // Global for all tests in the class
066  private static final AtomicLong COUNTER = new AtomicLong(0);
067
068  @Rule
069  public TestName testName = new TestName();
070  private SpaceQuotaHelperForTests helper;
071  private Connection conn;
072  private Admin admin;
073
074  @BeforeClass
075  public static void setup() throws Exception {
076    Configuration conf = TEST_UTIL.getConfiguration();
077    // The default 1s period for QuotaObserverChore is good.
078    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
079    // Set the period/delay to read region size from HDFS to be very long
080    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
081    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120);
082    // Set the same long period/delay to compute snapshot sizes
083    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120);
084    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120);
085    // Clean up the compacted files faster than normal (5s instead of 2mins)
086    conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
087
088    TEST_UTIL.startMiniCluster(1);
089  }
090
091  @AfterClass
092  public static void tearDown() throws Exception {
093    TEST_UTIL.shutdownMiniCluster();
094  }
095
096  @Before
097  public void removeAllQuotas() throws Exception {
098    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
099    conn = TEST_UTIL.getConnection();
100    admin = TEST_UTIL.getAdmin();
101    helper.waitForQuotaTable(conn);
102  }
103
104  @Test
105  public void testFlushes() throws Exception {
106    TableName tn = helper.createTableWithRegions(1);
107    // Set a quota
108    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
109        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
110    admin.setQuota(settings);
111
112    // Write some data
113    final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
114    helper.writeData(tn, initialSize);
115
116    // Make sure a flush happened
117    admin.flush(tn);
118
119    // We should be able to observe the system recording an increase in size (even
120    // though we know the filesystem scanning did not happen).
121    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
122      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
123        return snapshot.getUsage() >= initialSize;
124      }
125    });
126  }
127
128  @Test
129  public void testMajorCompaction() throws Exception {
130    TableName tn = helper.createTableWithRegions(1);
131    // Set a quota
132    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
133        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
134    admin.setQuota(settings);
135
136    // Write some data and flush it to disk.
137    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
138    helper.writeData(tn, sizePerBatch);
139    admin.flush(tn);
140
141    // Write the same data again, flushing it to a second file
142    helper.writeData(tn, sizePerBatch);
143    admin.flush(tn);
144
145    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
146    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
147      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
148        return snapshot.getUsage() >= 2L * sizePerBatch;
149      }
150    });
151
152    // Rewrite the two files into one.
153    admin.majorCompact(tn);
154
155    // After we major compact the table, we should notice quickly that the amount of data in the
156    // table is much closer to reality (the duplicate entries across the two files are removed).
157    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
158      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
159        return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
160      }
161    });
162  }
163
164  @Test
165  public void testMinorCompaction() throws Exception {
166    TableName tn = helper.createTableWithRegions(1);
167    // Set a quota
168    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
169        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
170    admin.setQuota(settings);
171
172    // Write some data and flush it to disk.
173    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
174    final long numBatches = 6;
175    for (long i = 0; i < numBatches; i++) {
176      helper.writeData(tn, sizePerBatch);
177      admin.flush(tn);
178    }
179
180    HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
181    long numFiles = getNumHFilesForRegion(region);
182    assertEquals(numBatches, numFiles);
183
184    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
185    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
186      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
187        return snapshot.getUsage() >= numFiles * sizePerBatch;
188      }
189    });
190
191    // Rewrite some files into fewer
192    TEST_UTIL.compact(tn, false);
193    long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
194
195    // After we major compact the table, we should notice quickly that the amount of data in the
196    // table is much closer to reality (the duplicate entries across the two files are removed).
197    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
198      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
199        return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch &&
200            snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
201      }
202    });
203  }
204
205  private long getNumHFilesForRegion(HRegion region) {
206    return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
207  }
208
209  @Test
210  public void testBulkLoading() throws Exception {
211    TableName tn = helper.createTableWithRegions(1);
212    // Set a quota
213    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
214        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
215    admin.setQuota(settings);
216
217    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 3, 550);
218    // Make sure the files are about as long as we expect
219    FileSystem fs = TEST_UTIL.getTestFileSystem();
220    FileStatus[] files = fs.listStatus(
221        new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
222    long totalSize = 0;
223    for (FileStatus file : files) {
224      assertTrue(
225          "Expected the file, " + file.getPath() + ",  length to be larger than 25KB, but was "
226              + file.getLen(),
227          file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
228      totalSize += file.getLen();
229    }
230
231    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
232    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
233    caller.callWithRetries(callable, Integer.MAX_VALUE);
234
235    final long finalTotalSize = totalSize;
236    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
237      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
238        return snapshot.getUsage() >= finalTotalSize;
239      }
240    });
241  }
242
243  @Test
244  public void testSnapshotSizes() throws Exception {
245    TableName tn = helper.createTableWithRegions(1);
246    // Set a quota
247    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
248        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
249    admin.setQuota(settings);
250
251    // Write some data and flush it to disk.
252    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
253    helper.writeData(tn, sizePerBatch);
254    admin.flush(tn);
255
256    final String snapshot1 = "snapshot1";
257    admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH);
258
259    // Compute the size of the file for the Region we'll send to archive
260    Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
261    List<? extends Store> stores = region.getStores();
262    long summer = 0;
263    for (Store store : stores) {
264      summer += store.getStorefilesSize();
265    }
266    final long storeFileSize = summer;
267
268    // Wait for the table to show the usage
269    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
270      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
271        return snapshot.getUsage() == storeFileSize;
272      }
273    });
274
275    // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want
276    // to test in the absence of this chore.
277    FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster()
278        .getSnapshotQuotaObserverChore().getNotifierForTable(tn);
279    notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1));
280
281    // Force a major compaction to create a new file and push the old file to the archive
282    TEST_UTIL.compact(tn, true);
283
284    // After moving the old file to archive/, the space of this table should double
285    // We have a new file created by the majc referenced by the table and the snapshot still
286    // referencing the old file.
287    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
288      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
289        return snapshot.getUsage() >= 2 * storeFileSize;
290      }
291    });
292
293    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
294      Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1));
295      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
296      assertTrue(r.advance());
297      assertEquals("The snapshot's size should be the same as the origin store file",
298          storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()));
299
300      r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString()));
301      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
302      assertTrue(r.advance());
303      assertEquals("The snapshot's size should be the same as the origin store file",
304          storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()));
305    }
306  }
307}