001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.Set;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.NamespaceNotFoundException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
061
062/**
063 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster.
064 */
065@Category(LargeTests.class)
066public class TestQuotaObserverChoreWithMiniCluster {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestQuotaObserverChoreWithMiniCluster.class);
071
072  private static final Logger LOG =
073      LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class);
074  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075  private static final AtomicLong COUNTER = new AtomicLong(0);
076  private static final long DEFAULT_WAIT_MILLIS = 500;
077
078  @Rule
079  public TestName testName = new TestName();
080
081  private HMaster master;
082  private QuotaObserverChore chore;
083  private SpaceQuotaSnapshotNotifierForTest snapshotNotifier;
084  private SpaceQuotaHelperForTests helper;
085
086  @BeforeClass
087  public static void setUp() throws Exception {
088    Configuration conf = TEST_UTIL.getConfiguration();
089    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
090    conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
091        SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class);
092    TEST_UTIL.startMiniCluster(1);
093  }
094
095  @AfterClass
096  public static void tearDown() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void removeAllQuotas() throws Exception {
102    final Connection conn = TEST_UTIL.getConnection();
103    if (helper == null) {
104      helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
105    }
106    // Wait for the quota table to be created
107    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
108      helper.waitForQuotaTable(conn);
109    } else {
110      // Or, clean up any quotas from previous test runs.
111      helper.removeAllQuotas(conn);
112      assertEquals(0, helper.listNumDefinedQuotas(conn));
113    }
114
115    master = TEST_UTIL.getMiniHBaseCluster().getMaster();
116    snapshotNotifier =
117        (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier();
118    assertNotNull(snapshotNotifier);
119    snapshotNotifier.clearSnapshots();
120    chore = master.getQuotaObserverChore();
121  }
122
123  @Test
124  public void testTableViolatesQuota() throws Exception {
125    TableName tn = helper.createTableWithRegions(10);
126
127    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
128    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
129    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
130    TEST_UTIL.getAdmin().setQuota(settings);
131
132    // Write more data than should be allowed
133    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
134
135    Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots();
136    boolean foundSnapshot = false;
137    while (!foundSnapshot) {
138      if (quotaSnapshots.isEmpty()) {
139        LOG.info("Found no violated quotas, sleeping and retrying. Current reports: "
140            + master.getMasterQuotaManager().snapshotRegionSizes());
141        sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
142        quotaSnapshots = snapshotNotifier.copySnapshots();
143      } else {
144        Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet());
145        assertEquals(tn, entry.getKey());
146        final SpaceQuotaSnapshot snapshot = entry.getValue();
147        if (!snapshot.getQuotaStatus().isInViolation()) {
148          LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot);
149          sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
150          quotaSnapshots = snapshotNotifier.copySnapshots();
151        } else {
152          foundSnapshot = true;
153        }
154      }
155    }
156
157    Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet());
158    assertEquals(tn, entry.getKey());
159    final SpaceQuotaSnapshot snapshot = entry.getValue();
160    assertEquals("Snapshot was " + snapshot, violationPolicy, snapshot.getQuotaStatus().getPolicy());
161    assertEquals(sizeLimit, snapshot.getLimit());
162    assertTrue(
163        "The usage should be greater than the limit, but were " + snapshot.getUsage() + " and "
164        + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit());
165  }
166
167  @Test
168  public void testNamespaceViolatesQuota() throws Exception {
169    final String namespace = testName.getMethodName();
170    final Admin admin = TEST_UTIL.getAdmin();
171    // Ensure the namespace exists
172    try {
173      admin.getNamespaceDescriptor(namespace);
174    } catch (NamespaceNotFoundException e) {
175      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
176      admin.createNamespace(desc);
177    }
178
179    TableName tn1 = helper.createTableWithRegions(namespace, 5);
180    TableName tn2 = helper.createTableWithRegions(namespace, 5);
181    TableName tn3 = helper.createTableWithRegions(namespace, 5);
182
183    final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
184    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
185    QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
186    admin.setQuota(settings);
187
188    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
189    admin.flush(tn1);
190    Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
191    for (int i = 0; i < 5; i++) {
192      // Check a few times to make sure we don't prematurely move to violation
193      assertEquals(
194          "Should not see any quota violations after writing 2MB of data", 0,
195          numSnapshotsInViolation(snapshots));
196      try {
197        Thread.sleep(DEFAULT_WAIT_MILLIS);
198      } catch (InterruptedException e) {
199        LOG.debug("Interrupted while sleeping." , e);
200      }
201      snapshots = snapshotNotifier.copySnapshots();
202    }
203
204    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
205    admin.flush(tn2);
206    snapshots = snapshotNotifier.copySnapshots();
207    for (int i = 0; i < 5; i++) {
208      // Check a few times to make sure we don't prematurely move to violation
209      assertEquals("Should not see any quota violations after writing 4MB of data", 0,
210          numSnapshotsInViolation(snapshots));
211      try {
212        Thread.sleep(DEFAULT_WAIT_MILLIS);
213      } catch (InterruptedException e) {
214        LOG.debug("Interrupted while sleeping." , e);
215      }
216      snapshots = snapshotNotifier.copySnapshots();
217    }
218
219    // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total)
220    // and should push all three tables in the namespace into violation.
221    helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
222    admin.flush(tn3);
223    snapshots = snapshotNotifier.copySnapshots();
224    while (numSnapshotsInViolation(snapshots) < 3) {
225      LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots
226          + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
227      try {
228        Thread.sleep(DEFAULT_WAIT_MILLIS);
229      } catch (InterruptedException e) {
230        LOG.debug("Interrupted while sleeping.", e);
231        Thread.currentThread().interrupt();
232      }
233      snapshots = snapshotNotifier.copySnapshots();
234    }
235
236    SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
237    assertNotNull("tn1 should be in violation", snapshot1);
238    assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy());
239    SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
240    assertNotNull("tn2 should be in violation", snapshot2);
241    assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy());
242    SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
243    assertNotNull("tn3 should be in violation", snapshot3);
244    assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy());
245    assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
246  }
247
248  @Test
249  public void testTableQuotaOverridesNamespaceQuota() throws Exception {
250    final String namespace = testName.getMethodName();
251    final Admin admin = TEST_UTIL.getAdmin();
252    // Ensure the namespace exists
253    try {
254      admin.getNamespaceDescriptor(namespace);
255    } catch (NamespaceNotFoundException e) {
256      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
257      admin.createNamespace(desc);
258    }
259
260    TableName tn1 = helper.createTableWithRegions(namespace, 5);
261    TableName tn2 = helper.createTableWithRegions(namespace, 5);
262
263    final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
264    final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE;
265    QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace,
266        namespaceSizeLimit, namespaceViolationPolicy);
267    admin.setQuota(namespaceSettings);
268
269    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
270    admin.flush(tn1);
271    Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
272    for (int i = 0; i < 5; i++) {
273      // Check a few times to make sure we don't prematurely move to violation
274      assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0,
275          numSnapshotsInViolation(snapshots));
276      try {
277        Thread.sleep(DEFAULT_WAIT_MILLIS);
278      } catch (InterruptedException e) {
279        LOG.debug("Interrupted while sleeping." , e);
280      }
281      snapshots = snapshotNotifier.copySnapshots();
282    }
283
284    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
285    admin.flush(tn2);
286    snapshots = snapshotNotifier.copySnapshots();
287    while (numSnapshotsInViolation(snapshots) < 2) {
288      LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots
289          + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
290      try {
291        Thread.sleep(DEFAULT_WAIT_MILLIS);
292      } catch (InterruptedException e) {
293        LOG.debug("Interrupted while sleeping.", e);
294        Thread.currentThread().interrupt();
295      }
296      snapshots = snapshotNotifier.copySnapshots();
297    }
298
299    SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
300    assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
301    assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy());
302    SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
303    assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
304    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
305
306    // Override the namespace quota with a table quota
307    final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
308    final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS;
309    QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit,
310        tableViolationPolicy);
311    admin.setQuota(tableSettings);
312
313    // Keep checking for the table quota policy to override the namespace quota
314    while (true) {
315      snapshots = snapshotNotifier.copySnapshots();
316      SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
317      assertNotNull("Violation policy should never be null", actualTableSnapshot);
318      if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy()) {
319        LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
320        try {
321          Thread.sleep(DEFAULT_WAIT_MILLIS);
322        } catch (InterruptedException e) {
323          LOG.debug("Interrupted while sleeping");
324          Thread.currentThread().interrupt();
325        }
326        continue;
327      }
328      assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy());
329      break;
330    }
331
332    // This should not change with the introduction of the table quota for tn1
333    actualPolicyTN2 = snapshots.get(tn2);
334    assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
335    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
336  }
337
338  @Test
339  public void testGetAllTablesWithQuotas() throws Exception {
340    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
341    Set<TableName> tablesWithQuotas = new HashSet<>();
342    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
343    // Partition the tables with quotas by table and ns quota
344    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
345
346    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
347    assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
348    assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
349  }
350
351  @Test
352  public void testRpcQuotaTablesAreFiltered() throws Exception {
353    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
354    Set<TableName> tablesWithQuotas = new HashSet<>();
355    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
356    // Partition the tables with quotas by table and ns quota
357    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
358
359    TableName rpcQuotaTable = helper.createTable();
360    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory
361      .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
362
363    // The `rpcQuotaTable` should not be included in this Set
364    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
365    assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
366    assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
367  }
368
369  @Test
370  public void testFilterRegions() throws Exception {
371    Map<TableName,Integer> mockReportedRegions = new HashMap<>();
372    // Can't mock because of primitive int as a return type -- Mockito
373    // can only handle an Integer.
374    TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(),
375        TEST_UTIL.getConfiguration()) {
376      @Override
377      int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) {
378        Integer i = mockReportedRegions.get(table);
379        if (i == null) {
380          return 0;
381        }
382        return i;
383      }
384    };
385
386    // Create the tables
387    TableName tn1 = helper.createTableWithRegions(20);
388    TableName tn2 = helper.createTableWithRegions(20);
389    TableName tn3 = helper.createTableWithRegions(20);
390
391    // Add them to the Tables with Quotas object
392    tables.addTableQuotaTable(tn1);
393    tables.addTableQuotaTable(tn2);
394    tables.addTableQuotaTable(tn3);
395
396    // Mock the number of regions reported
397    mockReportedRegions.put(tn1, 10); // 50%
398    mockReportedRegions.put(tn2, 19); // 95%
399    mockReportedRegions.put(tn3, 20); // 100%
400
401    // Argument is un-used
402    tables.filterInsufficientlyReportedTables(null);
403    // The default of 95% reported should prevent tn1 from appearing
404    assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables());
405  }
406
407  @Test
408  public void testFetchSpaceQuota() throws Exception {
409    Multimap<TableName,QuotaSettings> tables = helper.createTablesWithSpaceQuotas();
410    // Can pass in an empty map, we're not consulting it.
411    chore.initializeSnapshotStores(Collections.emptyMap());
412    // All tables that were created should have a quota defined.
413    for (Entry<TableName,QuotaSettings> entry : tables.entries()) {
414      final TableName table = entry.getKey();
415      final QuotaSettings qs = entry.getValue();
416
417      assertTrue("QuotaSettings was an instance of " + qs.getClass(),
418          qs instanceof SpaceLimitSettings);
419
420      SpaceQuota spaceQuota = null;
421      if (qs.getTableName() != null) {
422        spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table);
423        assertNotNull("Could not find table space quota for " + table, spaceQuota);
424      } else if (qs.getNamespace() != null) {
425        spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString());
426        assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota);
427      } else {
428        fail("Expected table or namespace space quota");
429      }
430
431      final SpaceLimitSettings sls = (SpaceLimitSettings) qs;
432      assertEquals(sls.getProto().getQuota(), spaceQuota);
433    }
434
435    TableName tableWithoutQuota = helper.createTable();
436    assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
437  }
438
439  private int numSnapshotsInViolation(Map<TableName,SpaceQuotaSnapshot> snapshots) {
440    int sum = 0;
441    for (SpaceQuotaSnapshot snapshot : snapshots.values()) {
442      if (snapshot.getQuotaStatus().isInViolation()) {
443        sum++;
444      }
445    }
446    return sum;
447  }
448
449  private void sleepWithInterrupt(long millis) {
450    try {
451      Thread.sleep(millis);
452    } catch (InterruptedException e) {
453      LOG.debug("Interrupted while sleeping");
454      Thread.currentThread().interrupt();
455    }
456  }
457}