001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Map;
024import java.util.Objects;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionReplicaUtil;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.master.HMaster;
038import org.apache.hadoop.hbase.master.MetricsMaster;
039import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
049
050/**
051 * Reads the currently received Region filesystem-space use reports and acts on those which
052 * violate a defined quota.
053 */
054@InterfaceAudience.Private
055public class QuotaObserverChore extends ScheduledChore {
056  private static final Logger LOG = LoggerFactory.getLogger(QuotaObserverChore.class);
057  static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY =
058      "hbase.master.quotas.observer.chore.period";
059  static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minutes in millis
060
061  static final String QUOTA_OBSERVER_CHORE_DELAY_KEY =
062      "hbase.master.quotas.observer.chore.delay";
063  static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
064
065  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
066      "hbase.master.quotas.observer.chore.timeunit";
067  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
068
069  static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
070      "hbase.master.quotas.observer.report.percent";
071  static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
072
073  static final String REGION_REPORT_RETENTION_DURATION_KEY =
074      "hbase.master.quotas.region.report.retention.millis";
075  static final long REGION_REPORT_RETENTION_DURATION_DEFAULT =
076      1000 * 60 * 10; // 10 minutes
077
078  private final Connection conn;
079  private final Configuration conf;
080  private final MasterQuotaManager quotaManager;
081  private final MetricsMaster metrics;
082  /*
083   * Callback that changes in quota snapshots are passed to.
084   */
085  private final SpaceQuotaSnapshotNotifier snapshotNotifier;
086
087  /*
088   * Preserves the state of quota snapshots for tables and namespaces
089   */
090  private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots;
091  private final Map<TableName,SpaceQuotaSnapshot> readOnlyTableQuotaSnapshots;
092  private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots;
093  private final Map<String,SpaceQuotaSnapshot> readOnlyNamespaceSnapshots;
094
095  // The time, in millis, that region reports should be kept by the master
096  private final long regionReportLifetimeMillis;
097
098  /*
099   * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
100   */
101  private QuotaSnapshotStore<TableName> tableSnapshotStore;
102  private QuotaSnapshotStore<String> namespaceSnapshotStore;
103
104  public QuotaObserverChore(HMaster master, MetricsMaster metrics) {
105    this(
106        master.getConnection(), master.getConfiguration(),
107        master.getSpaceQuotaSnapshotNotifier(), master.getMasterQuotaManager(),
108        master, metrics);
109  }
110
111  QuotaObserverChore(
112      Connection conn, Configuration conf, SpaceQuotaSnapshotNotifier snapshotNotifier,
113      MasterQuotaManager quotaManager, Stoppable stopper, MetricsMaster metrics) {
114    super(
115        QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
116        getInitialDelay(conf), getTimeUnit(conf));
117    this.conn = conn;
118    this.conf = conf;
119    this.metrics = metrics;
120    this.quotaManager = quotaManager;
121    this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
122    this.tableQuotaSnapshots = new ConcurrentHashMap<>();
123    this.readOnlyTableQuotaSnapshots = Collections.unmodifiableMap(tableQuotaSnapshots);
124    this.namespaceQuotaSnapshots = new ConcurrentHashMap<>();
125    this.readOnlyNamespaceSnapshots = Collections.unmodifiableMap(namespaceQuotaSnapshots);
126    this.regionReportLifetimeMillis = conf.getLong(
127        REGION_REPORT_RETENTION_DURATION_KEY, REGION_REPORT_RETENTION_DURATION_DEFAULT);
128  }
129
130  @Override
131  protected void chore() {
132    try {
133      if (LOG.isTraceEnabled()) {
134        LOG.trace("Refreshing space quotas in RegionServer");
135      }
136      long start = System.nanoTime();
137      _chore();
138      if (metrics != null) {
139        metrics.incrementQuotaObserverTime((System.nanoTime() - start) / 1_000_000);
140      }
141    } catch (IOException e) {
142      LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
143    }
144  }
145
146  void _chore() throws IOException {
147    // Get the total set of tables that have quotas defined. Includes table quotas
148    // and tables included by namespace quotas.
149    TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
150    if (LOG.isTraceEnabled()) {
151      LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
152    }
153
154    if (metrics != null) {
155      // Set the number of namespaces and tables with quotas defined
156      metrics.setNumSpaceQuotas(tablesWithQuotas.getTableQuotaTables().size()
157          + tablesWithQuotas.getNamespacesWithQuotas().size());
158    }
159
160    // The current "view" of region space use. Used henceforth.
161    final Map<RegionInfo,Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
162    if (LOG.isTraceEnabled()) {
163      LOG.trace(
164          "Using " + reportedRegionSpaceUse.size() + " region space use reports: " +
165          reportedRegionSpaceUse);
166    }
167
168    // Remove the "old" region reports
169    pruneOldRegionReports();
170
171    // Create the stores to track table and namespace snapshots
172    initializeSnapshotStores(reportedRegionSpaceUse);
173    // Report the number of (non-expired) region size reports
174    if (metrics != null) {
175      metrics.setNumRegionSizeReports(reportedRegionSpaceUse.size());
176    }
177
178    // Filter out tables for which we don't have adequate regionspace reports yet.
179    // Important that we do this after we instantiate the stores above
180    // This gives us a set of Tables which may or may not be violating their quota.
181    // To be safe, we want to make sure that these are not in violation.
182    Set<TableName> tablesInLimbo = tablesWithQuotas.filterInsufficientlyReportedTables(
183        tableSnapshotStore);
184
185    if (LOG.isTraceEnabled()) {
186      LOG.trace("Filtered insufficiently reported tables, left with " +
187          reportedRegionSpaceUse.size() + " regions reported");
188    }
189
190    for (TableName tableInLimbo : tablesInLimbo) {
191      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
192      SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
193      if (currentStatus.isInViolation()) {
194        if (LOG.isTraceEnabled()) {
195          LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
196              + " reported than required.");
197        }
198        SpaceQuotaSnapshot targetSnapshot = new SpaceQuotaSnapshot(
199            SpaceQuotaStatus.notInViolation(), currentSnapshot.getUsage(),
200            currentSnapshot.getLimit());
201        this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
202        // Update it in the Table QuotaStore so that memory is consistent with no violation.
203        tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
204        // In case of Disable SVP, we need to enable the table as it moves out of violation
205        if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy().orElse(null)) {
206          QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
207        }
208      }
209    }
210
211    // Transition each table to/from quota violation based on the current and target state.
212    // Only table quotas are enacted.
213    final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
214    processTablesWithQuotas(tablesWithTableQuotas);
215
216    // For each Namespace quota, transition each table in the namespace in or out of violation
217    // only if a table quota violation policy has not already been applied.
218    final Set<String> namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas();
219    final Multimap<String,TableName> tablesByNamespace = tablesWithQuotas.getTablesByNamespace();
220    processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
221  }
222
223  void initializeSnapshotStores(Map<RegionInfo,Long> regionSizes) {
224    Map<RegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
225    if (tableSnapshotStore == null) {
226      tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
227    } else {
228      tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
229    }
230    if (namespaceSnapshotStore == null) {
231      namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(
232          conn, this, immutableRegionSpaceUse);
233    } else {
234      namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
235    }
236  }
237
238  /**
239   * Processes each {@code TableName} which has a quota defined and moves it in or out of
240   * violation based on the space use.
241   *
242   * @param tablesWithTableQuotas The HBase tables which have quotas defined
243   */
244  void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
245    long numTablesInViolation = 0L;
246    for (TableName table : tablesWithTableQuotas) {
247      final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
248      if (spaceQuota == null) {
249        if (LOG.isDebugEnabled()) {
250          LOG.debug("Unexpectedly did not find a space quota for " + table
251              + ", maybe it was recently deleted.");
252        }
253        continue;
254      }
255      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
256      final SpaceQuotaSnapshot targetSnapshot = tableSnapshotStore.getTargetState(table, spaceQuota);
257      if (LOG.isTraceEnabled()) {
258        LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target="
259            + targetSnapshot);
260      }
261      updateTableQuota(table, currentSnapshot, targetSnapshot);
262
263      if (targetSnapshot.getQuotaStatus().isInViolation()) {
264        numTablesInViolation++;
265      }
266    }
267    // Report the number of tables in violation
268    if (metrics != null) {
269      metrics.setNumTableInSpaceQuotaViolation(numTablesInViolation);
270    }
271  }
272
273  /**
274   * Processes each namespace which has a quota defined and moves all of the tables contained
275   * in that namespace into or out of violation of the quota. Tables which are already in
276   * violation of a quota at the table level which <em>also</em> have a reside in a namespace
277   * with a violated quota will not have the namespace quota enacted. The table quota takes
278   * priority over the namespace quota.
279   *
280   * @param namespacesWithQuotas The set of namespaces that have quotas defined
281   * @param tablesByNamespace A mapping of namespaces and the tables contained in those namespaces
282   */
283  void processNamespacesWithQuotas(
284      final Set<String> namespacesWithQuotas,
285      final Multimap<String,TableName> tablesByNamespace) throws IOException {
286    long numNamespacesInViolation = 0L;
287    for (String namespace : namespacesWithQuotas) {
288      // Get the quota definition for the namespace
289      final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
290      if (spaceQuota == null) {
291        if (LOG.isDebugEnabled()) {
292          LOG.debug("Could not get Namespace space quota for " + namespace
293              + ", maybe it was recently deleted.");
294        }
295        continue;
296      }
297      final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
298      final SpaceQuotaSnapshot targetSnapshot = namespaceSnapshotStore.getTargetState(
299          namespace, spaceQuota);
300      if (LOG.isTraceEnabled()) {
301        LOG.trace("Processing " + namespace + " with current=" + currentSnapshot + ", target="
302            + targetSnapshot);
303      }
304      updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
305
306      if (targetSnapshot.getQuotaStatus().isInViolation()) {
307        numNamespacesInViolation++;
308      }
309    }
310
311    // Report the number of namespaces in violation
312    if (metrics != null) {
313      metrics.setNumNamespacesInSpaceQuotaViolation(numNamespacesInViolation);
314    }
315  }
316
317  /**
318   * Updates the hbase:quota table with the new quota policy for this <code>table</code>
319   * if necessary.
320   *
321   * @param table The table being checked
322   * @param currentSnapshot The state of the quota on this table from the previous invocation.
323   * @param targetSnapshot The state the quota should be in for this table.
324   */
325  void updateTableQuota(
326      TableName table, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot)
327          throws IOException {
328    final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
329    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
330
331    // If we're changing something, log it.
332    if (!currentSnapshot.equals(targetSnapshot)) {
333      this.snapshotNotifier.transitionTable(table, targetSnapshot);
334      // Update it in memory
335      tableSnapshotStore.setCurrentState(table, targetSnapshot);
336
337      // If the target is none, we're moving out of violation. Update the hbase:quota table
338      SpaceViolationPolicy currPolicy = currentStatus.getPolicy().orElse(null);
339      SpaceViolationPolicy targetPolicy = targetStatus.getPolicy().orElse(null);
340      if (!targetStatus.isInViolation()) {
341        // In case of Disable SVP, we need to enable the table as it moves out of violation
342        if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
343          QuotaUtil.enableTableIfNotEnabled(conn, table);
344        }
345        if (LOG.isDebugEnabled()) {
346          LOG.debug(table + " moved into observance of table space quota.");
347        }
348      } else {
349        // We're either moving into violation or changing violation policies
350        if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
351          // In case of policy switch, we need to enable the table if current policy is Disable SVP
352          QuotaUtil.enableTableIfNotEnabled(conn, table);
353        } else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
354          // In case of Disable SVP, we need to disable the table as it moves into violation
355          QuotaUtil.disableTableIfNotDisabled(conn, table);
356        }
357        if (LOG.isDebugEnabled()) {
358          LOG.debug(
359            table + " moved into violation of table space quota with policy of " + targetPolicy);
360        }
361      }
362    } else if (LOG.isTraceEnabled()) {
363      // Policies are the same, so we have nothing to do except log this. Don't need to re-update
364      // the quota table
365      if (!currentStatus.isInViolation()) {
366        LOG.trace(table + " remains in observance of quota.");
367      } else {
368        LOG.trace(table + " remains in violation of quota.");
369      }
370    }
371  }
372
373  /**
374   * Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
375   * case, currPolicy or/and targetPolicy will be having DISABLE policy.
376   * @param currPolicy currently set space violation policy
377   * @param targetPolicy new space violation policy
378   * @return true if is DISABLE space violation policy; otherwise false
379   */
380  private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
381      final SpaceViolationPolicy targetPolicy) {
382    return SpaceViolationPolicy.DISABLE == currPolicy
383        || SpaceViolationPolicy.DISABLE == targetPolicy;
384  }
385
386  /**
387   * Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
388   * if necessary.
389   *
390   * @param namespace The namespace being checked
391   * @param currentSnapshot The state of the quota on this namespace from the previous invocation
392   * @param targetSnapshot The state the quota should be in for this namespace
393   * @param tablesByNamespace A mapping of tables in namespaces.
394   */
395  void updateNamespaceQuota(
396      String namespace, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot,
397      final Multimap<String,TableName> tablesByNamespace) throws IOException {
398    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
399
400    // When the policies differ, we need to move into or out of violation
401    if (!currentSnapshot.equals(targetSnapshot)) {
402      // We want to have a policy of "NONE", moving out of violation
403      if (!targetStatus.isInViolation()) {
404        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
405          // If there is a quota on this table in violation
406          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
407            // Table-level quota violation policy is being applied here.
408            if (LOG.isTraceEnabled()) {
409              LOG.trace("Not activating Namespace violation policy because a Table violation"
410                  + " policy is already in effect for " + tableInNS);
411            }
412          } else {
413            LOG.info(tableInNS + " moving into observance of namespace space quota");
414            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
415          }
416        }
417      // We want to move into violation at the NS level
418      } else {
419        // Moving tables in the namespace into violation or to a different violation policy
420        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
421          final SpaceQuotaSnapshot tableQuotaSnapshot =
422                tableSnapshotStore.getCurrentState(tableInNS);
423          final boolean hasTableQuota =
424              !Objects.equals(QuotaSnapshotStore.NO_QUOTA, tableQuotaSnapshot);
425          if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
426            // Table-level quota violation policy is being applied here.
427            if (LOG.isTraceEnabled()) {
428              LOG.trace("Not activating Namespace violation policy because a Table violation"
429                  + " policy is already in effect for " + tableInNS);
430            }
431          } else {
432            // No table quota present or a table quota present that is not in violation
433            LOG.info(tableInNS + " moving into violation of namespace space quota with policy "
434                + targetStatus.getPolicy());
435            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
436          }
437        }
438      }
439      // Update the new state in memory for this namespace
440      namespaceSnapshotStore.setCurrentState(namespace, targetSnapshot);
441    } else {
442      // Policies are the same
443      if (!targetStatus.isInViolation()) {
444        // Both are NONE, so we remain in observance
445        if (LOG.isTraceEnabled()) {
446          LOG.trace(namespace + " remains in observance of quota.");
447        }
448      } else {
449        // Namespace quota is still in violation, need to enact if the table quota is not
450        // taking priority.
451        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
452          // Does a table policy exist
453          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
454            // Table-level quota violation policy is being applied here.
455            if (LOG.isTraceEnabled()) {
456              LOG.trace("Not activating Namespace violation policy because Table violation"
457                  + " policy is already in effect for " + tableInNS);
458            }
459          } else {
460            // No table policy, so enact namespace policy
461            LOG.info(tableInNS + " moving into violation of namespace space quota");
462            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
463          }
464        }
465      }
466    }
467  }
468
469  /**
470   * Removes region reports over a certain age.
471   */
472  void pruneOldRegionReports() {
473    final long now = EnvironmentEdgeManager.currentTime();
474    final long pruneTime = now - regionReportLifetimeMillis;
475    final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime,this);
476    if (LOG.isTraceEnabled()) {
477      LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
478          + pruneTime + ".");
479    }
480  }
481
482  /**
483   * Computes the set of all tables that have quotas defined. This includes tables with quotas
484   * explicitly set on them, in addition to tables that exist namespaces which have a quota
485   * defined.
486   */
487  TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
488    final Scan scan = QuotaTableUtil.makeScan(null);
489    final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
490    try (final QuotaRetriever scanner = new QuotaRetriever()) {
491      scanner.init(conn, scan);
492      for (QuotaSettings quotaSettings : scanner) {
493        // Only one of namespace and tablename should be 'null'
494        final String namespace = quotaSettings.getNamespace();
495        final TableName tableName = quotaSettings.getTableName();
496        if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
497          continue;
498        }
499
500        if (namespace != null) {
501          assert tableName == null;
502          // Collect all of the tables in the namespace
503          TableName[] tablesInNS = conn.getAdmin().listTableNamesByNamespace(namespace);
504          for (TableName tableUnderNs : tablesInNS) {
505            if (LOG.isTraceEnabled()) {
506              LOG.trace("Adding " + tableUnderNs + " under " +  namespace
507                  + " as having a namespace quota");
508            }
509            tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
510          }
511        } else {
512          assert tableName != null;
513          if (LOG.isTraceEnabled()) {
514            LOG.trace("Adding " + tableName + " as having table quota.");
515          }
516          // namespace is already null, must be a non-null tableName
517          tablesWithQuotas.addTableQuotaTable(tableName);
518        }
519      }
520      return tablesWithQuotas;
521    }
522  }
523
524  @VisibleForTesting
525  QuotaSnapshotStore<TableName> getTableSnapshotStore() {
526    return tableSnapshotStore;
527  }
528
529  @VisibleForTesting
530  QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
531    return namespaceSnapshotStore;
532  }
533
534  /**
535   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects
536   * for each HBase table with a quota defined.
537   */
538  public Map<TableName,SpaceQuotaSnapshot> getTableQuotaSnapshots() {
539    return readOnlyTableQuotaSnapshots;
540  }
541
542  /**
543   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects
544   * for each HBase namespace with a quota defined.
545   */
546  public Map<String,SpaceQuotaSnapshot> getNamespaceQuotaSnapshots() {
547    return readOnlyNamespaceSnapshots;
548  }
549
550  /**
551   * Fetches the {@link SpaceQuotaSnapshot} for the given table.
552   */
553  SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
554    SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
555    if (state == null) {
556      // No tracked state implies observance.
557      return QuotaSnapshotStore.NO_QUOTA;
558    }
559    return state;
560  }
561
562  /**
563   * Stores the quota state for the given table.
564   */
565  void setTableQuotaSnapshot(TableName table, SpaceQuotaSnapshot snapshot) {
566    this.tableQuotaSnapshots.put(table, snapshot);
567  }
568
569  /**
570   * Fetches the {@link SpaceQuotaSnapshot} for the given namespace from this chore.
571   */
572  SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
573    SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
574    if (state == null) {
575      // No tracked state implies observance.
576      return QuotaSnapshotStore.NO_QUOTA;
577    }
578    return state;
579  }
580
581  /**
582   * Stores the given {@code snapshot} for the given {@code namespace} in this chore.
583   */
584  void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
585    this.namespaceQuotaSnapshots.put(namespace, snapshot);
586  }
587
588  /**
589   * Extracts the period for the chore from the configuration.
590   *
591   * @param conf The configuration object.
592   * @return The configured chore period or the default value in the given timeunit.
593   * @see #getTimeUnit(Configuration)
594   */
595  static int getPeriod(Configuration conf) {
596    return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY,
597        QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
598  }
599
600  /**
601   * Extracts the initial delay for the chore from the configuration.
602   *
603   * @param conf The configuration object.
604   * @return The configured chore initial delay or the default value in the given timeunit.
605   * @see #getTimeUnit(Configuration)
606   */
607  static long getInitialDelay(Configuration conf) {
608    return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY,
609        QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
610  }
611
612  /**
613   * Extracts the time unit for the chore period and initial delay from the configuration. The
614   * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
615   * a {@link TimeUnit} value.
616   *
617   * @param conf The configuration object.
618   * @return The configured time unit for the chore period and initial delay or the default value.
619   */
620  static TimeUnit getTimeUnit(Configuration conf) {
621    return TimeUnit.valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY,
622        QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
623  }
624
625  /**
626   * Extracts the percent of Regions for a table to have been reported to enable quota violation
627   * state change.
628   *
629   * @param conf The configuration object.
630   * @return The percent of regions reported to use.
631   */
632  static Double getRegionReportPercent(Configuration conf) {
633    return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
634        QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
635  }
636
637  /**
638   * A container which encapsulates the tables that have either a table quota or are contained in a
639   * namespace which have a namespace quota.
640   */
641  static class TablesWithQuotas {
642    private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
643    private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
644    private final Connection conn;
645    private final Configuration conf;
646
647    public TablesWithQuotas(Connection conn, Configuration conf) {
648      this.conn = Objects.requireNonNull(conn);
649      this.conf = Objects.requireNonNull(conf);
650    }
651
652    Configuration getConfiguration() {
653      return conf;
654    }
655
656    /**
657     * Adds a table with a table quota.
658     */
659    public void addTableQuotaTable(TableName tn) {
660      tablesWithTableQuotas.add(tn);
661    }
662
663    /**
664     * Adds a table with a namespace quota.
665     */
666    public void addNamespaceQuotaTable(TableName tn) {
667      tablesWithNamespaceQuotas.add(tn);
668    }
669
670    /**
671     * Returns true if the given table has a table quota.
672     */
673    public boolean hasTableQuota(TableName tn) {
674      return tablesWithTableQuotas.contains(tn);
675    }
676
677    /**
678     * Returns true if the table exists in a namespace with a namespace quota.
679     */
680    public boolean hasNamespaceQuota(TableName tn) {
681      return tablesWithNamespaceQuotas.contains(tn);
682    }
683
684    /**
685     * Returns an unmodifiable view of all tables with table quotas.
686     */
687    public Set<TableName> getTableQuotaTables() {
688      return Collections.unmodifiableSet(tablesWithTableQuotas);
689    }
690
691    /**
692     * Returns an unmodifiable view of all tables in namespaces that have
693     * namespace quotas.
694     */
695    public Set<TableName> getNamespaceQuotaTables() {
696      return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
697    }
698
699    public Set<String> getNamespacesWithQuotas() {
700      Set<String> namespaces = new HashSet<>();
701      for (TableName tn : tablesWithNamespaceQuotas) {
702        namespaces.add(tn.getNamespaceAsString());
703      }
704      return namespaces;
705    }
706
707    /**
708     * Returns a view of all tables that reside in a namespace with a namespace
709     * quota, grouped by the namespace itself.
710     */
711    public Multimap<String,TableName> getTablesByNamespace() {
712      Multimap<String,TableName> tablesByNS = HashMultimap.create();
713      for (TableName tn : tablesWithNamespaceQuotas) {
714        tablesByNS.put(tn.getNamespaceAsString(), tn);
715      }
716      return tablesByNS;
717    }
718
719    /**
720     * Filters out all tables for which the Master currently doesn't have enough region space
721     * reports received from RegionServers yet.
722     */
723    public Set<TableName> filterInsufficientlyReportedTables(
724        QuotaSnapshotStore<TableName> tableStore) throws IOException {
725      final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
726      Set<TableName> tablesToRemove = new HashSet<>();
727      for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
728        // Don't recompute a table we've already computed
729        if (tablesToRemove.contains(table)) {
730          continue;
731        }
732        final int numRegionsInTable = getNumRegions(table);
733        // If the table doesn't exist (no regions), bail out.
734        if (numRegionsInTable == 0) {
735          if (LOG.isTraceEnabled()) {
736            LOG.trace("Filtering " + table + " because no regions were reported");
737          }
738          tablesToRemove.add(table);
739          continue;
740        }
741        final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore);
742        final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable;
743        if (ratioReported < percentRegionsReportedThreshold) {
744          if (LOG.isTraceEnabled()) {
745            LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota  + " of " +
746                numRegionsInTable + " regions were reported.");
747          }
748          tablesToRemove.add(table);
749        } else if (LOG.isTraceEnabled()) {
750          LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota  + " of " +
751              numRegionsInTable + " regions were reported.");
752        }
753      }
754      for (TableName tableToRemove : tablesToRemove) {
755        tablesWithTableQuotas.remove(tableToRemove);
756        tablesWithNamespaceQuotas.remove(tableToRemove);
757      }
758      return tablesToRemove;
759    }
760
761    /**
762     * Computes the total number of regions in a table.
763     */
764    int getNumRegions(TableName table) throws IOException {
765      List<RegionInfo> regions = this.conn.getAdmin().getRegions(table);
766      if (regions == null) {
767        return 0;
768      }
769      // Filter the region replicas if any and return the original number of regions for a table.
770      RegionReplicaUtil.removeNonDefaultRegions(regions);
771      return regions.size();
772    }
773
774    /**
775     * Computes the number of regions reported for a table.
776     */
777    int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
778        throws IOException {
779      return Iterables.size(tableStore.filterBySubject(table));
780    }
781
782    @Override
783    public String toString() {
784      final StringBuilder sb = new StringBuilder(32);
785      sb.append(getClass().getSimpleName())
786          .append(": tablesWithTableQuotas=")
787          .append(this.tablesWithTableQuotas)
788          .append(", tablesWithNamespaceQuotas=")
789          .append(this.tablesWithNamespaceQuotas);
790      return sb.toString();
791    }
792  }
793}