001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Map;
024import java.util.Objects;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.MetricsMaster;
038import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
044import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
045import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
046import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
048
049/**
050 * Reads the currently received Region filesystem-space use reports and acts on those which
051 * violate a defined quota.
052 */
053@InterfaceAudience.Private
054public class QuotaObserverChore extends ScheduledChore {
055  private static final Logger LOG = LoggerFactory.getLogger(QuotaObserverChore.class);
056  static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY =
057      "hbase.master.quotas.observer.chore.period";
058  static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minutes in millis
059
060  static final String QUOTA_OBSERVER_CHORE_DELAY_KEY =
061      "hbase.master.quotas.observer.chore.delay";
062  static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
063
064  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
065      "hbase.master.quotas.observer.chore.timeunit";
066  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
067
068  static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
069      "hbase.master.quotas.observer.report.percent";
070  static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
071
072  static final String REGION_REPORT_RETENTION_DURATION_KEY =
073      "hbase.master.quotas.region.report.retention.millis";
074  static final long REGION_REPORT_RETENTION_DURATION_DEFAULT =
075      1000 * 60 * 10; // 10 minutes
076
077  private final Connection conn;
078  private final Configuration conf;
079  private final MasterQuotaManager quotaManager;
080  private final MetricsMaster metrics;
081  /*
082   * Callback that changes in quota snapshots are passed to.
083   */
084  private final SpaceQuotaSnapshotNotifier snapshotNotifier;
085
086  /*
087   * Preserves the state of quota snapshots for tables and namespaces
088   */
089  private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots;
090  private final Map<TableName,SpaceQuotaSnapshot> readOnlyTableQuotaSnapshots;
091  private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots;
092  private final Map<String,SpaceQuotaSnapshot> readOnlyNamespaceSnapshots;
093
094  // The time, in millis, that region reports should be kept by the master
095  private final long regionReportLifetimeMillis;
096
097  /*
098   * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
099   */
100  private QuotaSnapshotStore<TableName> tableSnapshotStore;
101  private QuotaSnapshotStore<String> namespaceSnapshotStore;
102
103  public QuotaObserverChore(HMaster master, MetricsMaster metrics) {
104    this(
105        master.getConnection(), master.getConfiguration(),
106        master.getSpaceQuotaSnapshotNotifier(), master.getMasterQuotaManager(),
107        master, metrics);
108  }
109
110  QuotaObserverChore(
111      Connection conn, Configuration conf, SpaceQuotaSnapshotNotifier snapshotNotifier,
112      MasterQuotaManager quotaManager, Stoppable stopper, MetricsMaster metrics) {
113    super(
114        QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
115        getInitialDelay(conf), getTimeUnit(conf));
116    this.conn = conn;
117    this.conf = conf;
118    this.metrics = metrics;
119    this.quotaManager = quotaManager;
120    this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
121    this.tableQuotaSnapshots = new ConcurrentHashMap<>();
122    this.readOnlyTableQuotaSnapshots = Collections.unmodifiableMap(tableQuotaSnapshots);
123    this.namespaceQuotaSnapshots = new ConcurrentHashMap<>();
124    this.readOnlyNamespaceSnapshots = Collections.unmodifiableMap(namespaceQuotaSnapshots);
125    this.regionReportLifetimeMillis = conf.getLong(
126        REGION_REPORT_RETENTION_DURATION_KEY, REGION_REPORT_RETENTION_DURATION_DEFAULT);
127  }
128
129  @Override
130  protected void chore() {
131    try {
132      if (LOG.isTraceEnabled()) {
133        LOG.trace("Refreshing space quotas in RegionServer");
134      }
135      long start = System.nanoTime();
136      _chore();
137      if (metrics != null) {
138        metrics.incrementQuotaObserverTime((System.nanoTime() - start) / 1_000_000);
139      }
140    } catch (IOException e) {
141      LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
142    }
143  }
144
145  void _chore() throws IOException {
146    // Get the total set of tables that have quotas defined. Includes table quotas
147    // and tables included by namespace quotas.
148    TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
149    if (LOG.isTraceEnabled()) {
150      LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
151    }
152
153    if (metrics != null) {
154      // Set the number of namespaces and tables with quotas defined
155      metrics.setNumSpaceQuotas(tablesWithQuotas.getTableQuotaTables().size()
156          + tablesWithQuotas.getNamespacesWithQuotas().size());
157    }
158
159    // The current "view" of region space use. Used henceforth.
160    final Map<RegionInfo,Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
161    if (LOG.isTraceEnabled()) {
162      LOG.trace(
163          "Using " + reportedRegionSpaceUse.size() + " region space use reports: " +
164          reportedRegionSpaceUse);
165    }
166
167    // Remove the "old" region reports
168    pruneOldRegionReports();
169
170    // Create the stores to track table and namespace snapshots
171    initializeSnapshotStores(reportedRegionSpaceUse);
172    // Report the number of (non-expired) region size reports
173    if (metrics != null) {
174      metrics.setNumRegionSizeReports(reportedRegionSpaceUse.size());
175    }
176
177    // Filter out tables for which we don't have adequate regionspace reports yet.
178    // Important that we do this after we instantiate the stores above
179    // This gives us a set of Tables which may or may not be violating their quota.
180    // To be safe, we want to make sure that these are not in violation.
181    Set<TableName> tablesInLimbo = tablesWithQuotas.filterInsufficientlyReportedTables(
182        tableSnapshotStore);
183
184    if (LOG.isTraceEnabled()) {
185      LOG.trace("Filtered insufficiently reported tables, left with " +
186          reportedRegionSpaceUse.size() + " regions reported");
187    }
188
189    for (TableName tableInLimbo : tablesInLimbo) {
190      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
191      SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
192      if (currentStatus.isInViolation()) {
193        if (LOG.isTraceEnabled()) {
194          LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
195              + " reported than required.");
196        }
197        SpaceQuotaSnapshot targetSnapshot = new SpaceQuotaSnapshot(
198            SpaceQuotaStatus.notInViolation(), currentSnapshot.getUsage(),
199            currentSnapshot.getLimit());
200        this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
201        // Update it in the Table QuotaStore so that memory is consistent with no violation.
202        tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
203        // In case of Disable SVP, we need to enable the table as it moves out of violation
204        if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy()) {
205          QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
206        }
207      }
208    }
209
210    // Transition each table to/from quota violation based on the current and target state.
211    // Only table quotas are enacted.
212    final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
213    processTablesWithQuotas(tablesWithTableQuotas);
214
215    // For each Namespace quota, transition each table in the namespace in or out of violation
216    // only if a table quota violation policy has not already been applied.
217    final Set<String> namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas();
218    final Multimap<String,TableName> tablesByNamespace = tablesWithQuotas.getTablesByNamespace();
219    processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
220  }
221
222  void initializeSnapshotStores(Map<RegionInfo,Long> regionSizes) {
223    Map<RegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
224    if (tableSnapshotStore == null) {
225      tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
226    } else {
227      tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
228    }
229    if (namespaceSnapshotStore == null) {
230      namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(
231          conn, this, immutableRegionSpaceUse);
232    } else {
233      namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
234    }
235  }
236
237  /**
238   * Processes each {@code TableName} which has a quota defined and moves it in or out of
239   * violation based on the space use.
240   *
241   * @param tablesWithTableQuotas The HBase tables which have quotas defined
242   */
243  void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
244    long numTablesInViolation = 0L;
245    for (TableName table : tablesWithTableQuotas) {
246      final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
247      if (spaceQuota == null) {
248        if (LOG.isDebugEnabled()) {
249          LOG.debug("Unexpectedly did not find a space quota for " + table
250              + ", maybe it was recently deleted.");
251        }
252        continue;
253      }
254      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
255      final SpaceQuotaSnapshot targetSnapshot = tableSnapshotStore.getTargetState(table, spaceQuota);
256      if (LOG.isTraceEnabled()) {
257        LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target="
258            + targetSnapshot);
259      }
260      updateTableQuota(table, currentSnapshot, targetSnapshot);
261
262      if (targetSnapshot.getQuotaStatus().isInViolation()) {
263        numTablesInViolation++;
264      }
265    }
266    // Report the number of tables in violation
267    if (metrics != null) {
268      metrics.setNumTableInSpaceQuotaViolation(numTablesInViolation);
269    }
270  }
271
272  /**
273   * Processes each namespace which has a quota defined and moves all of the tables contained
274   * in that namespace into or out of violation of the quota. Tables which are already in
275   * violation of a quota at the table level which <em>also</em> have a reside in a namespace
276   * with a violated quota will not have the namespace quota enacted. The table quota takes
277   * priority over the namespace quota.
278   *
279   * @param namespacesWithQuotas The set of namespaces that have quotas defined
280   * @param tablesByNamespace A mapping of namespaces and the tables contained in those namespaces
281   */
282  void processNamespacesWithQuotas(
283      final Set<String> namespacesWithQuotas,
284      final Multimap<String,TableName> tablesByNamespace) throws IOException {
285    long numNamespacesInViolation = 0L;
286    for (String namespace : namespacesWithQuotas) {
287      // Get the quota definition for the namespace
288      final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
289      if (spaceQuota == null) {
290        if (LOG.isDebugEnabled()) {
291          LOG.debug("Could not get Namespace space quota for " + namespace
292              + ", maybe it was recently deleted.");
293        }
294        continue;
295      }
296      final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
297      final SpaceQuotaSnapshot targetSnapshot = namespaceSnapshotStore.getTargetState(
298          namespace, spaceQuota);
299      if (LOG.isTraceEnabled()) {
300        LOG.trace("Processing " + namespace + " with current=" + currentSnapshot + ", target="
301            + targetSnapshot);
302      }
303      updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
304
305      if (targetSnapshot.getQuotaStatus().isInViolation()) {
306        numNamespacesInViolation++;
307      }
308    }
309
310    // Report the number of namespaces in violation
311    if (metrics != null) {
312      metrics.setNumNamespacesInSpaceQuotaViolation(numNamespacesInViolation);
313    }
314  }
315
316  /**
317   * Updates the hbase:quota table with the new quota policy for this <code>table</code>
318   * if necessary.
319   *
320   * @param table The table being checked
321   * @param currentSnapshot The state of the quota on this table from the previous invocation.
322   * @param targetSnapshot The state the quota should be in for this table.
323   */
324  void updateTableQuota(
325      TableName table, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot)
326          throws IOException {
327    final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
328    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
329
330    // If we're changing something, log it.
331    if (!currentSnapshot.equals(targetSnapshot)) {
332      this.snapshotNotifier.transitionTable(table, targetSnapshot);
333      // Update it in memory
334      tableSnapshotStore.setCurrentState(table, targetSnapshot);
335
336      // If the target is none, we're moving out of violation. Update the hbase:quota table
337      SpaceViolationPolicy currPolicy = currentStatus.getPolicy();
338      SpaceViolationPolicy targetPolicy = targetStatus.getPolicy();
339      if (!targetStatus.isInViolation()) {
340        // In case of Disable SVP, we need to enable the table as it moves out of violation
341        if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
342          QuotaUtil.enableTableIfNotEnabled(conn, table);
343        }
344        if (LOG.isDebugEnabled()) {
345          LOG.debug(table + " moved into observance of table space quota.");
346        }
347      } else {
348        // We're either moving into violation or changing violation policies
349        if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
350          // In case of policy switch, we need to enable the table if current policy is Disable SVP
351          QuotaUtil.enableTableIfNotEnabled(conn, table);
352        } else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
353          // In case of Disable SVP, we need to disable the table as it moves into violation
354          QuotaUtil.disableTableIfNotDisabled(conn, table);
355        }
356        if (LOG.isDebugEnabled()) {
357          LOG.debug(
358            table + " moved into violation of table space quota with policy of " + targetPolicy);
359        }
360      }
361    } else if (LOG.isTraceEnabled()) {
362      // Policies are the same, so we have nothing to do except log this. Don't need to re-update
363      // the quota table
364      if (!currentStatus.isInViolation()) {
365        LOG.trace(table + " remains in observance of quota.");
366      } else {
367        LOG.trace(table + " remains in violation of quota.");
368      }
369    }
370  }
371
372  /**
373   * Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
374   * case, currPolicy or/and targetPolicy will be having DISABLE policy.
375   * @param currPolicy currently set space violation policy
376   * @param targetPolicy new space violation policy
377   * @return true if is DISABLE space violation policy; otherwise false
378   */
379  private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
380      final SpaceViolationPolicy targetPolicy) {
381    return SpaceViolationPolicy.DISABLE == currPolicy
382        || SpaceViolationPolicy.DISABLE == targetPolicy;
383  }
384
385  /**
386   * Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
387   * if necessary.
388   *
389   * @param namespace The namespace being checked
390   * @param currentSnapshot The state of the quota on this namespace from the previous invocation
391   * @param targetSnapshot The state the quota should be in for this namespace
392   * @param tablesByNamespace A mapping of tables in namespaces.
393   */
394  void updateNamespaceQuota(
395      String namespace, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot,
396      final Multimap<String,TableName> tablesByNamespace) throws IOException {
397    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
398
399    // When the policies differ, we need to move into or out of violation
400    if (!currentSnapshot.equals(targetSnapshot)) {
401      // We want to have a policy of "NONE", moving out of violation
402      if (!targetStatus.isInViolation()) {
403        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
404          // If there is a quota on this table in violation
405          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
406            // Table-level quota violation policy is being applied here.
407            if (LOG.isTraceEnabled()) {
408              LOG.trace("Not activating Namespace violation policy because a Table violation"
409                  + " policy is already in effect for " + tableInNS);
410            }
411          } else {
412            LOG.info(tableInNS + " moving into observance of namespace space quota");
413            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
414          }
415        }
416      // We want to move into violation at the NS level
417      } else {
418        // Moving tables in the namespace into violation or to a different violation policy
419        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
420          final SpaceQuotaSnapshot tableQuotaSnapshot =
421                tableSnapshotStore.getCurrentState(tableInNS);
422          final boolean hasTableQuota =
423              !Objects.equals(QuotaSnapshotStore.NO_QUOTA, tableQuotaSnapshot);
424          if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
425            // Table-level quota violation policy is being applied here.
426            if (LOG.isTraceEnabled()) {
427              LOG.trace("Not activating Namespace violation policy because a Table violation"
428                  + " policy is already in effect for " + tableInNS);
429            }
430          } else {
431            // No table quota present or a table quota present that is not in violation
432            LOG.info(tableInNS + " moving into violation of namespace space quota with policy "
433                + targetStatus.getPolicy());
434            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
435          }
436        }
437      }
438      // Update the new state in memory for this namespace
439      namespaceSnapshotStore.setCurrentState(namespace, targetSnapshot);
440    } else {
441      // Policies are the same
442      if (!targetStatus.isInViolation()) {
443        // Both are NONE, so we remain in observance
444        if (LOG.isTraceEnabled()) {
445          LOG.trace(namespace + " remains in observance of quota.");
446        }
447      } else {
448        // Namespace quota is still in violation, need to enact if the table quota is not
449        // taking priority.
450        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
451          // Does a table policy exist
452          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
453            // Table-level quota violation policy is being applied here.
454            if (LOG.isTraceEnabled()) {
455              LOG.trace("Not activating Namespace violation policy because Table violation"
456                  + " policy is already in effect for " + tableInNS);
457            }
458          } else {
459            // No table policy, so enact namespace policy
460            LOG.info(tableInNS + " moving into violation of namespace space quota");
461            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
462          }
463        }
464      }
465    }
466  }
467
468  /**
469   * Removes region reports over a certain age.
470   */
471  void pruneOldRegionReports() {
472    final long now = EnvironmentEdgeManager.currentTime();
473    final long pruneTime = now - regionReportLifetimeMillis;
474    final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime);
475    if (LOG.isTraceEnabled()) {
476      LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
477          + pruneTime + ".");
478    }
479  }
480
481  /**
482   * Computes the set of all tables that have quotas defined. This includes tables with quotas
483   * explicitly set on them, in addition to tables that exist namespaces which have a quota
484   * defined.
485   */
486  TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
487    final Scan scan = QuotaTableUtil.makeScan(null);
488    final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
489    try (final QuotaRetriever scanner = new QuotaRetriever()) {
490      scanner.init(conn, scan);
491      for (QuotaSettings quotaSettings : scanner) {
492        // Only one of namespace and tablename should be 'null'
493        final String namespace = quotaSettings.getNamespace();
494        final TableName tableName = quotaSettings.getTableName();
495        if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
496          continue;
497        }
498
499        if (namespace != null) {
500          assert tableName == null;
501          // Collect all of the tables in the namespace
502          TableName[] tablesInNS = conn.getAdmin().listTableNamesByNamespace(namespace);
503          for (TableName tableUnderNs : tablesInNS) {
504            if (LOG.isTraceEnabled()) {
505              LOG.trace("Adding " + tableUnderNs + " under " +  namespace
506                  + " as having a namespace quota");
507            }
508            tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
509          }
510        } else {
511          assert tableName != null;
512          if (LOG.isTraceEnabled()) {
513            LOG.trace("Adding " + tableName + " as having table quota.");
514          }
515          // namespace is already null, must be a non-null tableName
516          tablesWithQuotas.addTableQuotaTable(tableName);
517        }
518      }
519      return tablesWithQuotas;
520    }
521  }
522
523  @VisibleForTesting
524  QuotaSnapshotStore<TableName> getTableSnapshotStore() {
525    return tableSnapshotStore;
526  }
527
528  @VisibleForTesting
529  QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
530    return namespaceSnapshotStore;
531  }
532
533  /**
534   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects
535   * for each HBase table with a quota defined.
536   */
537  public Map<TableName,SpaceQuotaSnapshot> getTableQuotaSnapshots() {
538    return readOnlyTableQuotaSnapshots;
539  }
540
541  /**
542   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects
543   * for each HBase namespace with a quota defined.
544   */
545  public Map<String,SpaceQuotaSnapshot> getNamespaceQuotaSnapshots() {
546    return readOnlyNamespaceSnapshots;
547  }
548
549  /**
550   * Fetches the {@link SpaceQuotaSnapshot} for the given table.
551   */
552  SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
553    SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
554    if (state == null) {
555      // No tracked state implies observance.
556      return QuotaSnapshotStore.NO_QUOTA;
557    }
558    return state;
559  }
560
561  /**
562   * Stores the quota state for the given table.
563   */
564  void setTableQuotaSnapshot(TableName table, SpaceQuotaSnapshot snapshot) {
565    this.tableQuotaSnapshots.put(table, snapshot);
566  }
567
568  /**
569   * Fetches the {@link SpaceQuotaSnapshot} for the given namespace from this chore.
570   */
571  SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
572    SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
573    if (state == null) {
574      // No tracked state implies observance.
575      return QuotaSnapshotStore.NO_QUOTA;
576    }
577    return state;
578  }
579
580  /**
581   * Stores the given {@code snapshot} for the given {@code namespace} in this chore.
582   */
583  void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
584    this.namespaceQuotaSnapshots.put(namespace, snapshot);
585  }
586
587  /**
588   * Extracts the period for the chore from the configuration.
589   *
590   * @param conf The configuration object.
591   * @return The configured chore period or the default value in the given timeunit.
592   * @see #getTimeUnit(Configuration)
593   */
594  static int getPeriod(Configuration conf) {
595    return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY,
596        QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
597  }
598
599  /**
600   * Extracts the initial delay for the chore from the configuration.
601   *
602   * @param conf The configuration object.
603   * @return The configured chore initial delay or the default value in the given timeunit.
604   * @see #getTimeUnit(Configuration)
605   */
606  static long getInitialDelay(Configuration conf) {
607    return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY,
608        QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
609  }
610
611  /**
612   * Extracts the time unit for the chore period and initial delay from the configuration. The
613   * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
614   * a {@link TimeUnit} value.
615   *
616   * @param conf The configuration object.
617   * @return The configured time unit for the chore period and initial delay or the default value.
618   */
619  static TimeUnit getTimeUnit(Configuration conf) {
620    return TimeUnit.valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY,
621        QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
622  }
623
624  /**
625   * Extracts the percent of Regions for a table to have been reported to enable quota violation
626   * state change.
627   *
628   * @param conf The configuration object.
629   * @return The percent of regions reported to use.
630   */
631  static Double getRegionReportPercent(Configuration conf) {
632    return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
633        QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
634  }
635
636  /**
637   * A container which encapsulates the tables that have either a table quota or are contained in a
638   * namespace which have a namespace quota.
639   */
640  static class TablesWithQuotas {
641    private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
642    private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
643    private final Connection conn;
644    private final Configuration conf;
645
646    public TablesWithQuotas(Connection conn, Configuration conf) {
647      this.conn = Objects.requireNonNull(conn);
648      this.conf = Objects.requireNonNull(conf);
649    }
650
651    Configuration getConfiguration() {
652      return conf;
653    }
654
655    /**
656     * Adds a table with a table quota.
657     */
658    public void addTableQuotaTable(TableName tn) {
659      tablesWithTableQuotas.add(tn);
660    }
661
662    /**
663     * Adds a table with a namespace quota.
664     */
665    public void addNamespaceQuotaTable(TableName tn) {
666      tablesWithNamespaceQuotas.add(tn);
667    }
668
669    /**
670     * Returns true if the given table has a table quota.
671     */
672    public boolean hasTableQuota(TableName tn) {
673      return tablesWithTableQuotas.contains(tn);
674    }
675
676    /**
677     * Returns true if the table exists in a namespace with a namespace quota.
678     */
679    public boolean hasNamespaceQuota(TableName tn) {
680      return tablesWithNamespaceQuotas.contains(tn);
681    }
682
683    /**
684     * Returns an unmodifiable view of all tables with table quotas.
685     */
686    public Set<TableName> getTableQuotaTables() {
687      return Collections.unmodifiableSet(tablesWithTableQuotas);
688    }
689
690    /**
691     * Returns an unmodifiable view of all tables in namespaces that have
692     * namespace quotas.
693     */
694    public Set<TableName> getNamespaceQuotaTables() {
695      return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
696    }
697
698    public Set<String> getNamespacesWithQuotas() {
699      Set<String> namespaces = new HashSet<>();
700      for (TableName tn : tablesWithNamespaceQuotas) {
701        namespaces.add(tn.getNamespaceAsString());
702      }
703      return namespaces;
704    }
705
706    /**
707     * Returns a view of all tables that reside in a namespace with a namespace
708     * quota, grouped by the namespace itself.
709     */
710    public Multimap<String,TableName> getTablesByNamespace() {
711      Multimap<String,TableName> tablesByNS = HashMultimap.create();
712      for (TableName tn : tablesWithNamespaceQuotas) {
713        tablesByNS.put(tn.getNamespaceAsString(), tn);
714      }
715      return tablesByNS;
716    }
717
718    /**
719     * Filters out all tables for which the Master currently doesn't have enough region space
720     * reports received from RegionServers yet.
721     */
722    public Set<TableName> filterInsufficientlyReportedTables(
723        QuotaSnapshotStore<TableName> tableStore) throws IOException {
724      final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
725      Set<TableName> tablesToRemove = new HashSet<>();
726      for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
727        // Don't recompute a table we've already computed
728        if (tablesToRemove.contains(table)) {
729          continue;
730        }
731        final int numRegionsInTable = getNumRegions(table);
732        // If the table doesn't exist (no regions), bail out.
733        if (numRegionsInTable == 0) {
734          if (LOG.isTraceEnabled()) {
735            LOG.trace("Filtering " + table + " because no regions were reported");
736          }
737          tablesToRemove.add(table);
738          continue;
739        }
740        final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore);
741        final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable;
742        if (ratioReported < percentRegionsReportedThreshold) {
743          if (LOG.isTraceEnabled()) {
744            LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota  + " of " +
745                numRegionsInTable + " regions were reported.");
746          }
747          tablesToRemove.add(table);
748        } else if (LOG.isTraceEnabled()) {
749          LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota  + " of " +
750              numRegionsInTable + " regions were reported.");
751        }
752      }
753      for (TableName tableToRemove : tablesToRemove) {
754        tablesWithTableQuotas.remove(tableToRemove);
755        tablesWithNamespaceQuotas.remove(tableToRemove);
756      }
757      return tablesToRemove;
758    }
759
760    /**
761     * Computes the total number of regions in a table.
762     */
763    int getNumRegions(TableName table) throws IOException {
764      List<RegionInfo> regions = this.conn.getAdmin().getRegions(table);
765      if (regions == null) {
766        return 0;
767      }
768      return regions.size();
769    }
770
771    /**
772     * Computes the number of regions reported for a table.
773     */
774    int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
775        throws IOException {
776      return Iterables.size(tableStore.filterBySubject(table));
777    }
778
779    @Override
780    public String toString() {
781      final StringBuilder sb = new StringBuilder(32);
782      sb.append(getClass().getSimpleName())
783          .append(": tablesWithTableQuotas=")
784          .append(this.tablesWithTableQuotas)
785          .append(", tablesWithNamespaceQuotas=")
786          .append(this.tablesWithNamespaceQuotas);
787      return sb.toString();
788    }
789  }
790}