001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicLong;
026import java.util.stream.Collectors;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ClientServiceCallable;
037import org.apache.hadoop.hbase.client.ClusterConnection;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.RpcRetryingCaller;
041import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
042import org.apache.hadoop.hbase.client.SnapshotType;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.regionserver.Region;
047import org.apache.hadoop.hbase.regionserver.Store;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
059
060@Category({ MediumTests.class })
061public class TestLowLatencySpaceQuotas {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class);
066
067  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
068  // Global for all tests in the class
069  private static final AtomicLong COUNTER = new AtomicLong(0);
070
071  @Rule
072  public TestName testName = new TestName();
073  private SpaceQuotaHelperForTests helper;
074  private Connection conn;
075  private Admin admin;
076
077  @BeforeClass
078  public static void setup() throws Exception {
079    Configuration conf = TEST_UTIL.getConfiguration();
080    // The default 1s period for QuotaObserverChore is good.
081    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
082    // Set the period/delay to read region size from HDFS to be very long
083    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
084    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120);
085    // Set the same long period/delay to compute snapshot sizes
086    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120);
087    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120);
088    // Clean up the compacted files faster than normal (5s instead of 2mins)
089    conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
090
091    TEST_UTIL.startMiniCluster(1);
092  }
093
094  @AfterClass
095  public static void tearDown() throws Exception {
096    TEST_UTIL.shutdownMiniCluster();
097  }
098
099  @Before
100  public void removeAllQuotas() throws Exception {
101    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
102    conn = TEST_UTIL.getConnection();
103    admin = TEST_UTIL.getAdmin();
104    helper.waitForQuotaTable(conn);
105  }
106
107  @Test
108  public void testFlushes() throws Exception {
109    TableName tn = helper.createTableWithRegions(1);
110    // Set a quota
111    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
112      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
113    admin.setQuota(settings);
114
115    // Write some data
116    final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
117    helper.writeData(tn, initialSize);
118
119    // Make sure a flush happened
120    admin.flush(tn);
121
122    // We should be able to observe the system recording an increase in size (even
123    // though we know the filesystem scanning did not happen).
124    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
125      @Override
126      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
127        return snapshot.getUsage() >= initialSize;
128      }
129    });
130  }
131
132  @Test
133  public void testMajorCompaction() throws Exception {
134    TableName tn = helper.createTableWithRegions(1);
135    // Set a quota
136    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
137      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
138    admin.setQuota(settings);
139
140    // Write some data and flush it to disk.
141    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
142    helper.writeData(tn, sizePerBatch);
143    admin.flush(tn);
144
145    // Write the same data again, flushing it to a second file
146    helper.writeData(tn, sizePerBatch);
147    admin.flush(tn);
148
149    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
150    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
151      @Override
152      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
153        return snapshot.getUsage() >= 2L * sizePerBatch;
154      }
155    });
156
157    // Rewrite the two files into one.
158    admin.majorCompact(tn);
159
160    // After we major compact the table, we should notice quickly that the amount of data in the
161    // table is much closer to reality (the duplicate entries across the two files are removed).
162    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
163      @Override
164      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
165        return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
166      }
167    });
168  }
169
170  @Test
171  public void testMinorCompaction() throws Exception {
172    TableName tn = helper.createTableWithRegions(1);
173    // Set a quota
174    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
175      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
176    admin.setQuota(settings);
177
178    // Write some data and flush it to disk.
179    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
180    final long numBatches = 6;
181    for (long i = 0; i < numBatches; i++) {
182      helper.writeData(tn, sizePerBatch);
183      admin.flush(tn);
184    }
185
186    HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
187    long numFiles = getNumHFilesForRegion(region);
188    assertEquals(numBatches, numFiles);
189
190    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
191    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
192      @Override
193      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
194        return snapshot.getUsage() >= numFiles * sizePerBatch;
195      }
196    });
197
198    // Rewrite some files into fewer
199    TEST_UTIL.compact(tn, false);
200    long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
201
202    // After we major compact the table, we should notice quickly that the amount of data in the
203    // table is much closer to reality (the duplicate entries across the two files are removed).
204    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
205      @Override
206      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
207        return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch
208          && snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
209      }
210    });
211  }
212
213  private long getNumHFilesForRegion(HRegion region) {
214    return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
215  }
216
217  @Test
218  public void testBulkLoading() throws Exception {
219    TableName tn = helper.createTableWithRegions(1);
220    // Set a quota
221    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
222      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
223    admin.setQuota(settings);
224    admin.compactionSwitch(false,
225      admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
226
227    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 3, 550);
228    // Make sure the files are about as long as we expect
229    FileSystem fs = TEST_UTIL.getTestFileSystem();
230    FileStatus[] files =
231      fs.listStatus(new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
232    long totalSize = 0;
233    for (FileStatus file : files) {
234      assertTrue("Expected the file, " + file.getPath()
235        + ",  length to be larger than 25KB, but was " + file.getLen(),
236        file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
237      totalSize += file.getLen();
238    }
239
240    final ClusterConnection clusterConn = (ClusterConnection) conn;
241    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(),
242      clusterConn.getConnectionConfiguration());
243    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
244    caller.callWithRetries(callable, Integer.MAX_VALUE);
245
246    final long finalTotalSize = totalSize;
247    try {
248      TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
249        @Override
250        boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
251          return snapshot.getUsage() >= finalTotalSize;
252        }
253      });
254    } finally {
255      admin.compactionSwitch(true,
256        admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList()));
257    }
258  }
259
260  @Test
261  public void testSnapshotSizes() throws Exception {
262    TableName tn = helper.createTableWithRegions(1);
263    // Set a quota
264    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn,
265      SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
266    admin.setQuota(settings);
267
268    // Write some data and flush it to disk.
269    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
270    helper.writeData(tn, sizePerBatch);
271    admin.flush(tn);
272
273    final String snapshot1 = "snapshot1";
274    admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH);
275
276    // Compute the size of the file for the Region we'll send to archive
277    Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
278    List<? extends Store> stores = region.getStores();
279    long summer = 0;
280    for (Store store : stores) {
281      summer += store.getStorefilesSize();
282    }
283    final long storeFileSize = summer;
284
285    // Wait for the table to show the usage
286    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
287      @Override
288      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
289        return snapshot.getUsage() == storeFileSize;
290      }
291    });
292
293    // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want
294    // to test in the absence of this chore.
295    FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster()
296      .getSnapshotQuotaObserverChore().getNotifierForTable(tn);
297    notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1));
298
299    // Force a major compaction to create a new file and push the old file to the archive
300    TEST_UTIL.compact(tn, true);
301
302    // After moving the old file to archive/, the space of this table should double
303    // We have a new file created by the majc referenced by the table and the snapshot still
304    // referencing the old file.
305    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
306      @Override
307      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
308        return snapshot.getUsage() >= 2 * storeFileSize;
309      }
310    });
311
312    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
313      Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1));
314      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
315      assertTrue(r.advance());
316      assertEquals("The snapshot's size should be the same as the origin store file", storeFileSize,
317        QuotaTableUtil.parseSnapshotSize(r.current()));
318
319      r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString()));
320      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
321      assertTrue(r.advance());
322      assertEquals("The snapshot's size should be the same as the origin store file", storeFileSize,
323        QuotaTableUtil.parseSnapshotSize(r.current()));
324    }
325  }
326}