001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.Set;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.NamespaceNotFoundException;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.master.HMaster;
043import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.BeforeAll;
047import org.junit.jupiter.api.BeforeEach;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.Test;
050import org.junit.jupiter.api.TestInfo;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
055import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
058
059/**
060 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster.
061 */
062@Tag(LargeTests.TAG)
063public class TestQuotaObserverChoreWithMiniCluster {
064
065  private static final Logger LOG =
066    LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class);
067  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
068  private static final AtomicLong COUNTER = new AtomicLong(0);
069  private static final long DEFAULT_WAIT_MILLIS = 500;
070
071  private HMaster master;
072  private QuotaObserverChore chore;
073  private SpaceQuotaSnapshotNotifierForTest snapshotNotifier;
074  private SpaceQuotaHelperForTests helper;
075
076  @BeforeAll
077  public static void setUp() throws Exception {
078    Configuration conf = TEST_UTIL.getConfiguration();
079    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
080    conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
081      SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class);
082    TEST_UTIL.startMiniCluster(1);
083  }
084
085  @AfterAll
086  public static void tearDown() throws Exception {
087    TEST_UTIL.shutdownMiniCluster();
088  }
089
090  @BeforeEach
091  public void removeAllQuotas(TestInfo testInfo) throws Exception {
092    final Connection conn = TEST_UTIL.getConnection();
093    if (helper == null) {
094      helper = new SpaceQuotaHelperForTests(TEST_UTIL,
095        () -> testInfo.getTestMethod().get().getName(), COUNTER);
096    }
097    // Wait for the quota table to be created
098    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
099      helper.waitForQuotaTable(conn);
100    } else {
101      // Or, clean up any quotas from previous test runs.
102      helper.removeAllQuotas(conn);
103      assertEquals(0, helper.listNumDefinedQuotas(conn));
104    }
105
106    master = TEST_UTIL.getMiniHBaseCluster().getMaster();
107    snapshotNotifier = (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier();
108    assertNotNull(snapshotNotifier);
109    snapshotNotifier.clearSnapshots();
110    chore = master.getQuotaObserverChore();
111  }
112
113  @Test
114  public void testTableViolatesQuota() throws Exception {
115    TableName tn = helper.createTableWithRegions(10);
116
117    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
118    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
119    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
120    TEST_UTIL.getAdmin().setQuota(settings);
121
122    // Write more data than should be allowed
123    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
124
125    Map<TableName, SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots();
126    boolean foundSnapshot = false;
127    while (!foundSnapshot) {
128      if (quotaSnapshots.isEmpty()) {
129        LOG.info("Found no violated quotas, sleeping and retrying. Current reports: "
130          + master.getMasterQuotaManager().snapshotRegionSizes());
131        sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
132        quotaSnapshots = snapshotNotifier.copySnapshots();
133      } else {
134        Entry<TableName, SpaceQuotaSnapshot> entry =
135          Iterables.getOnlyElement(quotaSnapshots.entrySet());
136        assertEquals(tn, entry.getKey());
137        final SpaceQuotaSnapshot snapshot = entry.getValue();
138        if (!snapshot.getQuotaStatus().isInViolation()) {
139          LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot);
140          sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
141          quotaSnapshots = snapshotNotifier.copySnapshots();
142        } else {
143          foundSnapshot = true;
144        }
145      }
146    }
147
148    Entry<TableName, SpaceQuotaSnapshot> entry =
149      Iterables.getOnlyElement(quotaSnapshots.entrySet());
150    assertEquals(tn, entry.getKey());
151    final SpaceQuotaSnapshot snapshot = entry.getValue();
152    assertEquals(violationPolicy, snapshot.getQuotaStatus().getPolicy().get(),
153      "Snapshot was " + snapshot);
154    assertEquals(sizeLimit, snapshot.getLimit());
155    assertTrue(snapshot.getUsage() > snapshot.getLimit(),
156      "The usage should be greater than the limit, but were " + snapshot.getUsage() + " and "
157        + snapshot.getLimit() + ", respectively");
158  }
159
160  @Test
161  public void testNamespaceViolatesQuota(TestInfo testInfo) throws Exception {
162    final String namespace = testInfo.getTestMethod().get().getName();
163    final Admin admin = TEST_UTIL.getAdmin();
164    // Ensure the namespace exists
165    try {
166      admin.getNamespaceDescriptor(namespace);
167    } catch (NamespaceNotFoundException e) {
168      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
169      admin.createNamespace(desc);
170    }
171
172    TableName tn1 = helper.createTableWithRegions(namespace, 5);
173    TableName tn2 = helper.createTableWithRegions(namespace, 5);
174    TableName tn3 = helper.createTableWithRegions(namespace, 5);
175
176    final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
177    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
178    QuotaSettings settings =
179      QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
180    admin.setQuota(settings);
181
182    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
183    admin.flush(tn1);
184    Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
185    for (int i = 0; i < 5; i++) {
186      // Check a few times to make sure we don't prematurely move to violation
187      assertEquals(0, numSnapshotsInViolation(snapshots),
188        "Should not see any quota violations after writing 2MB of data");
189      try {
190        Thread.sleep(DEFAULT_WAIT_MILLIS);
191      } catch (InterruptedException e) {
192        LOG.debug("Interrupted while sleeping.", e);
193      }
194      snapshots = snapshotNotifier.copySnapshots();
195    }
196
197    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
198    admin.flush(tn2);
199    snapshots = snapshotNotifier.copySnapshots();
200    for (int i = 0; i < 5; i++) {
201      // Check a few times to make sure we don't prematurely move to violation
202      assertEquals(0, numSnapshotsInViolation(snapshots),
203        "Should not see any quota violations after writing 4MB of data");
204      try {
205        Thread.sleep(DEFAULT_WAIT_MILLIS);
206      } catch (InterruptedException e) {
207        LOG.debug("Interrupted while sleeping.", e);
208      }
209      snapshots = snapshotNotifier.copySnapshots();
210    }
211
212    // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total)
213    // and should push all three tables in the namespace into violation.
214    helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
215    admin.flush(tn3);
216    snapshots = snapshotNotifier.copySnapshots();
217    while (numSnapshotsInViolation(snapshots) < 3) {
218      LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots
219        + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
220      try {
221        Thread.sleep(DEFAULT_WAIT_MILLIS);
222      } catch (InterruptedException e) {
223        LOG.debug("Interrupted while sleeping.", e);
224        Thread.currentThread().interrupt();
225      }
226      snapshots = snapshotNotifier.copySnapshots();
227    }
228
229    SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
230    assertNotNull(snapshot1, "tn1 should be in violation");
231    assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get());
232    SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
233    assertNotNull(snapshot2, "tn2 should be in violation");
234    assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get());
235    SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
236    assertNotNull(snapshot3, "tn3 should be in violation");
237    assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get());
238    assertTrue(snapshots.isEmpty(), "Unexpected additional quota violations: " + snapshots);
239  }
240
241  @Test
242  public void testTableQuotaOverridesNamespaceQuota(TestInfo testInfo) throws Exception {
243    final String namespace = testInfo.getTestMethod().get().getName();
244    final Admin admin = TEST_UTIL.getAdmin();
245    // Ensure the namespace exists
246    try {
247      admin.getNamespaceDescriptor(namespace);
248    } catch (NamespaceNotFoundException e) {
249      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
250      admin.createNamespace(desc);
251    }
252
253    TableName tn1 = helper.createTableWithRegions(namespace, 5);
254    TableName tn2 = helper.createTableWithRegions(namespace, 5);
255
256    final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
257    final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE;
258    QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace,
259      namespaceSizeLimit, namespaceViolationPolicy);
260    admin.setQuota(namespaceSettings);
261
262    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
263    admin.flush(tn1);
264    Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
265    for (int i = 0; i < 5; i++) {
266      // Check a few times to make sure we don't prematurely move to violation
267      assertEquals(0, numSnapshotsInViolation(snapshots),
268        "Should not see any quota violations after writing 2MB of data: " + snapshots);
269      try {
270        Thread.sleep(DEFAULT_WAIT_MILLIS);
271      } catch (InterruptedException e) {
272        LOG.debug("Interrupted while sleeping.", e);
273      }
274      snapshots = snapshotNotifier.copySnapshots();
275    }
276
277    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
278    admin.flush(tn2);
279    snapshots = snapshotNotifier.copySnapshots();
280    while (numSnapshotsInViolation(snapshots) < 2) {
281      LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots
282        + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
283      try {
284        Thread.sleep(DEFAULT_WAIT_MILLIS);
285      } catch (InterruptedException e) {
286        LOG.debug("Interrupted while sleeping.", e);
287        Thread.currentThread().interrupt();
288      }
289      snapshots = snapshotNotifier.copySnapshots();
290    }
291
292    SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
293    assertNotNull(actualPolicyTN1, "Expected to see violation policy for tn1");
294    assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get());
295    SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
296    assertNotNull(actualPolicyTN2, "Expected to see violation policy for tn2");
297    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
298
299    // Override the namespace quota with a table quota
300    final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
301    final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS;
302    QuotaSettings tableSettings =
303      QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, tableViolationPolicy);
304    admin.setQuota(tableSettings);
305
306    // Keep checking for the table quota policy to override the namespace quota
307    while (true) {
308      snapshots = snapshotNotifier.copySnapshots();
309      SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
310      assertNotNull(actualTableSnapshot, "Violation policy should never be null");
311      if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) {
312        LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
313        try {
314          Thread.sleep(DEFAULT_WAIT_MILLIS);
315        } catch (InterruptedException e) {
316          LOG.debug("Interrupted while sleeping");
317          Thread.currentThread().interrupt();
318        }
319        continue;
320      }
321      assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get());
322      break;
323    }
324
325    // This should not change with the introduction of the table quota for tn1
326    actualPolicyTN2 = snapshots.get(tn2);
327    assertNotNull(actualPolicyTN2, "Expected to see violation policy for tn2");
328    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
329  }
330
331  @Test
332  public void testGetAllTablesWithQuotas() throws Exception {
333    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
334    Set<TableName> tablesWithQuotas = new HashSet<>();
335    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
336    // Partition the tables with quotas by table and ns quota
337    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
338
339    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
340    assertEquals(tablesWithQuotas, tables.getTableQuotaTables(), "Found tables: " + tables);
341    assertEquals(namespaceTablesWithQuotas, tables.getNamespaceQuotaTables(),
342      "Found tables: " + tables);
343  }
344
345  @Test
346  public void testRpcQuotaTablesAreFiltered() throws Exception {
347    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
348    Set<TableName> tablesWithQuotas = new HashSet<>();
349    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
350    // Partition the tables with quotas by table and ns quota
351    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
352
353    TableName rpcQuotaTable = helper.createTable();
354    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.throttleTable(rpcQuotaTable,
355      ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
356
357    // The `rpcQuotaTable` should not be included in this Set
358    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
359    assertEquals(tablesWithQuotas, tables.getTableQuotaTables(), "Found tables: " + tables);
360    assertEquals(namespaceTablesWithQuotas, tables.getNamespaceQuotaTables(),
361      "Found tables: " + tables);
362  }
363
364  @Test
365  public void testFilterRegions() throws Exception {
366    Map<TableName, Integer> mockReportedRegions = new HashMap<>();
367    // Can't mock because of primitive int as a return type -- Mockito
368    // can only handle an Integer.
369    TablesWithQuotas tables =
370      new TablesWithQuotas(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()) {
371        @Override
372        int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) {
373          Integer i = mockReportedRegions.get(table);
374          if (i == null) {
375            return 0;
376          }
377          return i;
378        }
379      };
380
381    // Create the tables
382    TableName tn1 = helper.createTableWithRegions(20);
383    TableName tn2 = helper.createTableWithRegions(20);
384    TableName tn3 = helper.createTableWithRegions(20);
385
386    // Add them to the Tables with Quotas object
387    tables.addTableQuotaTable(tn1);
388    tables.addTableQuotaTable(tn2);
389    tables.addTableQuotaTable(tn3);
390
391    // Mock the number of regions reported
392    mockReportedRegions.put(tn1, 10); // 50%
393    mockReportedRegions.put(tn2, 19); // 95%
394    mockReportedRegions.put(tn3, 20); // 100%
395
396    // Argument is un-used
397    tables.filterInsufficientlyReportedTables(null);
398    // The default of 95% reported should prevent tn1 from appearing
399    assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables());
400  }
401
402  @Test
403  public void testFetchSpaceQuota() throws Exception {
404    Multimap<TableName, QuotaSettings> tables = helper.createTablesWithSpaceQuotas();
405    // Can pass in an empty map, we're not consulting it.
406    chore.initializeSnapshotStores(Collections.emptyMap());
407    // All tables that were created should have a quota defined.
408    for (Entry<TableName, QuotaSettings> entry : tables.entries()) {
409      final TableName table = entry.getKey();
410      final QuotaSettings qs = entry.getValue();
411
412      assertTrue(qs instanceof SpaceLimitSettings,
413        "QuotaSettings was an instance of " + qs.getClass());
414
415      SpaceQuota spaceQuota = null;
416      if (qs.getTableName() != null) {
417        spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table);
418        assertNotNull(spaceQuota, "Could not find table space quota for " + table);
419      } else if (qs.getNamespace() != null) {
420        spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString());
421        assertNotNull(spaceQuota,
422          "Could not find namespace space quota for " + table.getNamespaceAsString());
423      } else {
424        fail("Expected table or namespace space quota");
425      }
426
427      final SpaceLimitSettings sls = (SpaceLimitSettings) qs;
428      assertEquals(sls.getProto().getQuota(), spaceQuota);
429    }
430
431    TableName tableWithoutQuota = helper.createTable();
432    assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
433  }
434
435  private int numSnapshotsInViolation(Map<TableName, SpaceQuotaSnapshot> snapshots) {
436    int sum = 0;
437    for (SpaceQuotaSnapshot snapshot : snapshots.values()) {
438      if (snapshot.getQuotaStatus().isInViolation()) {
439        sum++;
440      }
441    }
442    return sum;
443  }
444
445  private void sleepWithInterrupt(long millis) {
446    try {
447      Thread.sleep(millis);
448    } catch (InterruptedException e) {
449      LOG.debug("Interrupted while sleeping");
450      Thread.currentThread().interrupt();
451    }
452  }
453}