001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.Set;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.NamespaceNotFoundException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
061
062/**
063 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster.
064 */
065@Category(LargeTests.class)
066public class TestQuotaObserverChoreWithMiniCluster {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestQuotaObserverChoreWithMiniCluster.class);
071
072  private static final Logger LOG =
073      LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class);
074  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075  private static final AtomicLong COUNTER = new AtomicLong(0);
076  private static final long DEFAULT_WAIT_MILLIS = 500;
077
078  @Rule
079  public TestName testName = new TestName();
080
081  private HMaster master;
082  private QuotaObserverChore chore;
083  private SpaceQuotaSnapshotNotifierForTest snapshotNotifier;
084  private SpaceQuotaHelperForTests helper;
085
086  @BeforeClass
087  public static void setUp() throws Exception {
088    Configuration conf = TEST_UTIL.getConfiguration();
089    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
090    conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
091        SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class);
092    TEST_UTIL.startMiniCluster(1);
093  }
094
095  @AfterClass
096  public static void tearDown() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void removeAllQuotas() throws Exception {
102    final Connection conn = TEST_UTIL.getConnection();
103    if (helper == null) {
104      helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
105    }
106    // Wait for the quota table to be created
107    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
108      helper.waitForQuotaTable(conn);
109    } else {
110      // Or, clean up any quotas from previous test runs.
111      helper.removeAllQuotas(conn);
112      assertEquals(0, helper.listNumDefinedQuotas(conn));
113    }
114
115    master = TEST_UTIL.getMiniHBaseCluster().getMaster();
116    snapshotNotifier =
117        (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier();
118    assertNotNull(snapshotNotifier);
119    snapshotNotifier.clearSnapshots();
120    chore = master.getQuotaObserverChore();
121  }
122
123  @Test
124  public void testTableViolatesQuota() throws Exception {
125    TableName tn = helper.createTableWithRegions(10);
126
127    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
128    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
129    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
130    TEST_UTIL.getAdmin().setQuota(settings);
131
132    // Write more data than should be allowed
133    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
134
135    Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots();
136    boolean foundSnapshot = false;
137    while (!foundSnapshot) {
138      if (quotaSnapshots.isEmpty()) {
139        LOG.info("Found no violated quotas, sleeping and retrying. Current reports: "
140            + master.getMasterQuotaManager().snapshotRegionSizes());
141        sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
142        quotaSnapshots = snapshotNotifier.copySnapshots();
143      } else {
144        Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet());
145        assertEquals(tn, entry.getKey());
146        final SpaceQuotaSnapshot snapshot = entry.getValue();
147        if (!snapshot.getQuotaStatus().isInViolation()) {
148          LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot);
149          sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
150          quotaSnapshots = snapshotNotifier.copySnapshots();
151        } else {
152          foundSnapshot = true;
153        }
154      }
155    }
156
157    Entry<TableName, SpaceQuotaSnapshot> entry =
158      Iterables.getOnlyElement(quotaSnapshots.entrySet());
159    assertEquals(tn, entry.getKey());
160    final SpaceQuotaSnapshot snapshot = entry.getValue();
161    assertEquals("Snapshot was " + snapshot, violationPolicy,
162      snapshot.getQuotaStatus().getPolicy().get());
163    assertEquals(sizeLimit, snapshot.getLimit());
164    assertTrue("The usage should be greater than the limit, but were " + snapshot.getUsage() +
165      " and " + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit());
166  }
167
168  @Test
169  public void testNamespaceViolatesQuota() throws Exception {
170    final String namespace = testName.getMethodName();
171    final Admin admin = TEST_UTIL.getAdmin();
172    // Ensure the namespace exists
173    try {
174      admin.getNamespaceDescriptor(namespace);
175    } catch (NamespaceNotFoundException e) {
176      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
177      admin.createNamespace(desc);
178    }
179
180    TableName tn1 = helper.createTableWithRegions(namespace, 5);
181    TableName tn2 = helper.createTableWithRegions(namespace, 5);
182    TableName tn3 = helper.createTableWithRegions(namespace, 5);
183
184    final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
185    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
186    QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
187    admin.setQuota(settings);
188
189    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
190    admin.flush(tn1);
191    Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
192    for (int i = 0; i < 5; i++) {
193      // Check a few times to make sure we don't prematurely move to violation
194      assertEquals(
195          "Should not see any quota violations after writing 2MB of data", 0,
196          numSnapshotsInViolation(snapshots));
197      try {
198        Thread.sleep(DEFAULT_WAIT_MILLIS);
199      } catch (InterruptedException e) {
200        LOG.debug("Interrupted while sleeping." , e);
201      }
202      snapshots = snapshotNotifier.copySnapshots();
203    }
204
205    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
206    admin.flush(tn2);
207    snapshots = snapshotNotifier.copySnapshots();
208    for (int i = 0; i < 5; i++) {
209      // Check a few times to make sure we don't prematurely move to violation
210      assertEquals("Should not see any quota violations after writing 4MB of data", 0,
211          numSnapshotsInViolation(snapshots));
212      try {
213        Thread.sleep(DEFAULT_WAIT_MILLIS);
214      } catch (InterruptedException e) {
215        LOG.debug("Interrupted while sleeping." , e);
216      }
217      snapshots = snapshotNotifier.copySnapshots();
218    }
219
220    // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total)
221    // and should push all three tables in the namespace into violation.
222    helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
223    admin.flush(tn3);
224    snapshots = snapshotNotifier.copySnapshots();
225    while (numSnapshotsInViolation(snapshots) < 3) {
226      LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots
227          + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
228      try {
229        Thread.sleep(DEFAULT_WAIT_MILLIS);
230      } catch (InterruptedException e) {
231        LOG.debug("Interrupted while sleeping.", e);
232        Thread.currentThread().interrupt();
233      }
234      snapshots = snapshotNotifier.copySnapshots();
235    }
236
237    SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
238    assertNotNull("tn1 should be in violation", snapshot1);
239    assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get());
240    SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
241    assertNotNull("tn2 should be in violation", snapshot2);
242    assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get());
243    SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
244    assertNotNull("tn3 should be in violation", snapshot3);
245    assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get());
246    assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
247  }
248
249  @Test
250  public void testTableQuotaOverridesNamespaceQuota() throws Exception {
251    final String namespace = testName.getMethodName();
252    final Admin admin = TEST_UTIL.getAdmin();
253    // Ensure the namespace exists
254    try {
255      admin.getNamespaceDescriptor(namespace);
256    } catch (NamespaceNotFoundException e) {
257      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
258      admin.createNamespace(desc);
259    }
260
261    TableName tn1 = helper.createTableWithRegions(namespace, 5);
262    TableName tn2 = helper.createTableWithRegions(namespace, 5);
263
264    final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
265    final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE;
266    QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace,
267        namespaceSizeLimit, namespaceViolationPolicy);
268    admin.setQuota(namespaceSettings);
269
270    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
271    admin.flush(tn1);
272    Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
273    for (int i = 0; i < 5; i++) {
274      // Check a few times to make sure we don't prematurely move to violation
275      assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0,
276          numSnapshotsInViolation(snapshots));
277      try {
278        Thread.sleep(DEFAULT_WAIT_MILLIS);
279      } catch (InterruptedException e) {
280        LOG.debug("Interrupted while sleeping." , e);
281      }
282      snapshots = snapshotNotifier.copySnapshots();
283    }
284
285    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
286    admin.flush(tn2);
287    snapshots = snapshotNotifier.copySnapshots();
288    while (numSnapshotsInViolation(snapshots) < 2) {
289      LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots
290          + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
291      try {
292        Thread.sleep(DEFAULT_WAIT_MILLIS);
293      } catch (InterruptedException e) {
294        LOG.debug("Interrupted while sleeping.", e);
295        Thread.currentThread().interrupt();
296      }
297      snapshots = snapshotNotifier.copySnapshots();
298    }
299
300    SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
301    assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
302    assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get());
303    SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
304    assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
305    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
306
307    // Override the namespace quota with a table quota
308    final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
309    final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS;
310    QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit,
311        tableViolationPolicy);
312    admin.setQuota(tableSettings);
313
314    // Keep checking for the table quota policy to override the namespace quota
315    while (true) {
316      snapshots = snapshotNotifier.copySnapshots();
317      SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
318      assertNotNull("Violation policy should never be null", actualTableSnapshot);
319      if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) {
320        LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
321        try {
322          Thread.sleep(DEFAULT_WAIT_MILLIS);
323        } catch (InterruptedException e) {
324          LOG.debug("Interrupted while sleeping");
325          Thread.currentThread().interrupt();
326        }
327        continue;
328      }
329      assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get());
330      break;
331    }
332
333    // This should not change with the introduction of the table quota for tn1
334    actualPolicyTN2 = snapshots.get(tn2);
335    assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
336    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
337  }
338
339  @Test
340  public void testGetAllTablesWithQuotas() throws Exception {
341    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
342    Set<TableName> tablesWithQuotas = new HashSet<>();
343    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
344    // Partition the tables with quotas by table and ns quota
345    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
346
347    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
348    assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
349    assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
350  }
351
352  @Test
353  public void testRpcQuotaTablesAreFiltered() throws Exception {
354    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
355    Set<TableName> tablesWithQuotas = new HashSet<>();
356    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
357    // Partition the tables with quotas by table and ns quota
358    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
359
360    TableName rpcQuotaTable = helper.createTable();
361    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory
362      .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
363
364    // The `rpcQuotaTable` should not be included in this Set
365    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
366    assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
367    assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
368  }
369
370  @Test
371  public void testFilterRegions() throws Exception {
372    Map<TableName,Integer> mockReportedRegions = new HashMap<>();
373    // Can't mock because of primitive int as a return type -- Mockito
374    // can only handle an Integer.
375    TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(),
376        TEST_UTIL.getConfiguration()) {
377      @Override
378      int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) {
379        Integer i = mockReportedRegions.get(table);
380        if (i == null) {
381          return 0;
382        }
383        return i;
384      }
385    };
386
387    // Create the tables
388    TableName tn1 = helper.createTableWithRegions(20);
389    TableName tn2 = helper.createTableWithRegions(20);
390    TableName tn3 = helper.createTableWithRegions(20);
391
392    // Add them to the Tables with Quotas object
393    tables.addTableQuotaTable(tn1);
394    tables.addTableQuotaTable(tn2);
395    tables.addTableQuotaTable(tn3);
396
397    // Mock the number of regions reported
398    mockReportedRegions.put(tn1, 10); // 50%
399    mockReportedRegions.put(tn2, 19); // 95%
400    mockReportedRegions.put(tn3, 20); // 100%
401
402    // Argument is un-used
403    tables.filterInsufficientlyReportedTables(null);
404    // The default of 95% reported should prevent tn1 from appearing
405    assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables());
406  }
407
408  @Test
409  public void testFetchSpaceQuota() throws Exception {
410    Multimap<TableName,QuotaSettings> tables = helper.createTablesWithSpaceQuotas();
411    // Can pass in an empty map, we're not consulting it.
412    chore.initializeSnapshotStores(Collections.emptyMap());
413    // All tables that were created should have a quota defined.
414    for (Entry<TableName,QuotaSettings> entry : tables.entries()) {
415      final TableName table = entry.getKey();
416      final QuotaSettings qs = entry.getValue();
417
418      assertTrue("QuotaSettings was an instance of " + qs.getClass(),
419          qs instanceof SpaceLimitSettings);
420
421      SpaceQuota spaceQuota = null;
422      if (qs.getTableName() != null) {
423        spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table);
424        assertNotNull("Could not find table space quota for " + table, spaceQuota);
425      } else if (qs.getNamespace() != null) {
426        spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString());
427        assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota);
428      } else {
429        fail("Expected table or namespace space quota");
430      }
431
432      final SpaceLimitSettings sls = (SpaceLimitSettings) qs;
433      assertEquals(sls.getProto().getQuota(), spaceQuota);
434    }
435
436    TableName tableWithoutQuota = helper.createTable();
437    assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
438  }
439
440  private int numSnapshotsInViolation(Map<TableName,SpaceQuotaSnapshot> snapshots) {
441    int sum = 0;
442    for (SpaceQuotaSnapshot snapshot : snapshots.values()) {
443      if (snapshot.getQuotaStatus().isInViolation()) {
444        sum++;
445      }
446    }
447    return sum;
448  }
449
450  private void sleepWithInterrupt(long millis) {
451    try {
452      Thread.sleep(millis);
453    } catch (InterruptedException e) {
454      LOG.debug("Interrupted while sleeping");
455      Thread.currentThread().interrupt();
456    }
457  }
458}