001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import static org.junit.Assert.assertEquals;
020import static org.junit.Assert.assertTrue;
021
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicLong;
025import java.util.stream.Collectors;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ClientServiceCallable;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.RpcRetryingCaller;
040import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
041import org.apache.hadoop.hbase.client.SnapshotType;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.regionserver.Region;
046import org.apache.hadoop.hbase.regionserver.Store;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058
059@Category({MediumTests.class})
060public class TestLowLatencySpaceQuotas {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class);
065
066  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
067  // Global for all tests in the class
068  private static final AtomicLong COUNTER = new AtomicLong(0);
069
070  @Rule
071  public TestName testName = new TestName();
072  private SpaceQuotaHelperForTests helper;
073  private Connection conn;
074  private Admin admin;
075
076  @BeforeClass
077  public static void setup() throws Exception {
078    Configuration conf = TEST_UTIL.getConfiguration();
079    // The default 1s period for QuotaObserverChore is good.
080    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
081    // Set the period/delay to read region size from HDFS to be very long
082    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
083    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120);
084    // Set the same long period/delay to compute snapshot sizes
085    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120);
086    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120);
087    // Clean up the compacted files faster than normal (5s instead of 2mins)
088    conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
089
090    TEST_UTIL.startMiniCluster(1);
091  }
092
093  @AfterClass
094  public static void tearDown() throws Exception {
095    TEST_UTIL.shutdownMiniCluster();
096  }
097
098  @Before
099  public void removeAllQuotas() throws Exception {
100    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
101    conn = TEST_UTIL.getConnection();
102    admin = TEST_UTIL.getAdmin();
103    helper.waitForQuotaTable(conn);
104  }
105
106  @Test
107  public void testFlushes() throws Exception {
108    TableName tn = helper.createTableWithRegions(1);
109    // Set a quota
110    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
111        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
112    admin.setQuota(settings);
113
114    // Write some data
115    final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
116    helper.writeData(tn, initialSize);
117
118    // Make sure a flush happened
119    admin.flush(tn);
120
121    // We should be able to observe the system recording an increase in size (even
122    // though we know the filesystem scanning did not happen).
123    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
124      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
125        return snapshot.getUsage() >= initialSize;
126      }
127    });
128  }
129
130  @Test
131  public void testMajorCompaction() throws Exception {
132    TableName tn = helper.createTableWithRegions(1);
133    // Set a quota
134    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
135        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
136    admin.setQuota(settings);
137
138    // Write some data and flush it to disk.
139    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
140    helper.writeData(tn, sizePerBatch);
141    admin.flush(tn);
142
143    // Write the same data again, flushing it to a second file
144    helper.writeData(tn, sizePerBatch);
145    admin.flush(tn);
146
147    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
148    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
149      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
150        return snapshot.getUsage() >= 2L * sizePerBatch;
151      }
152    });
153
154    // Rewrite the two files into one.
155    admin.majorCompact(tn);
156
157    // After we major compact the table, we should notice quickly that the amount of data in the
158    // table is much closer to reality (the duplicate entries across the two files are removed).
159    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
160      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
161        return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
162      }
163    });
164  }
165
166  @Test
167  public void testMinorCompaction() throws Exception {
168    TableName tn = helper.createTableWithRegions(1);
169    // Set a quota
170    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
171        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
172    admin.setQuota(settings);
173
174    // Write some data and flush it to disk.
175    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
176    final long numBatches = 6;
177    for (long i = 0; i < numBatches; i++) {
178      helper.writeData(tn, sizePerBatch);
179      admin.flush(tn);
180    }
181
182    HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
183    long numFiles = getNumHFilesForRegion(region);
184    assertEquals(numBatches, numFiles);
185
186    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
187    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
188      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
189        return snapshot.getUsage() >= numFiles * sizePerBatch;
190      }
191    });
192
193    // Rewrite some files into fewer
194    TEST_UTIL.compact(tn, false);
195    long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
196
197    // After we major compact the table, we should notice quickly that the amount of data in the
198    // table is much closer to reality (the duplicate entries across the two files are removed).
199    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
200      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
201        return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch &&
202            snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
203      }
204    });
205  }
206
207  private long getNumHFilesForRegion(HRegion region) {
208    return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
209  }
210
211  @Test
212  public void testBulkLoading() throws Exception {
213    TableName tn = helper.createTableWithRegions(1);
214    // Set a quota
215    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
216        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
217    admin.setQuota(settings);
218    admin.compactionSwitch(false,
219      admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
220
221    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 3, 550);
222    // Make sure the files are about as long as we expect
223    FileSystem fs = TEST_UTIL.getTestFileSystem();
224    FileStatus[] files = fs.listStatus(
225        new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
226    long totalSize = 0;
227    for (FileStatus file : files) {
228      assertTrue(
229          "Expected the file, " + file.getPath() + ",  length to be larger than 25KB, but was "
230              + file.getLen(),
231          file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
232      totalSize += file.getLen();
233    }
234
235    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
236    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
237    caller.callWithRetries(callable, Integer.MAX_VALUE);
238
239    final long finalTotalSize = totalSize;
240    try {
241      TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
242        @Override
243        boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
244          return snapshot.getUsage() >= finalTotalSize;
245        }
246      });
247    } finally {
248      admin.compactionSwitch(true,
249        admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
250    }
251  }
252
253  @Test
254  public void testSnapshotSizes() throws Exception {
255    TableName tn = helper.createTableWithRegions(1);
256    // Set a quota
257    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
258        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
259    admin.setQuota(settings);
260
261    // Write some data and flush it to disk.
262    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
263    helper.writeData(tn, sizePerBatch);
264    admin.flush(tn);
265
266    final String snapshot1 = "snapshot1";
267    admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH);
268
269    // Compute the size of the file for the Region we'll send to archive
270    Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
271    List<? extends Store> stores = region.getStores();
272    long summer = 0;
273    for (Store store : stores) {
274      summer += store.getStorefilesSize();
275    }
276    final long storeFileSize = summer;
277
278    // Wait for the table to show the usage
279    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
280      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
281        return snapshot.getUsage() == storeFileSize;
282      }
283    });
284
285    // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want
286    // to test in the absence of this chore.
287    FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster()
288        .getSnapshotQuotaObserverChore().getNotifierForTable(tn);
289    notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1));
290
291    // Force a major compaction to create a new file and push the old file to the archive
292    TEST_UTIL.compact(tn, true);
293
294    // After moving the old file to archive/, the space of this table should double
295    // We have a new file created by the majc referenced by the table and the snapshot still
296    // referencing the old file.
297    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
298      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
299        return snapshot.getUsage() >= 2 * storeFileSize;
300      }
301    });
302
303    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
304      Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1));
305      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
306      assertTrue(r.advance());
307      assertEquals("The snapshot's size should be the same as the origin store file",
308          storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()));
309
310      r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString()));
311      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
312      assertTrue(r.advance());
313      assertEquals("The snapshot's size should be the same as the origin store file",
314          storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()));
315    }
316  }
317}