001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Objects;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionReplicaUtil;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.master.HMaster;
038import org.apache.hadoop.hbase.master.MetricsMaster;
039import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
050
051/**
052 * Reads the currently received Region filesystem-space use reports and acts on those which violate
053 * a defined quota.
054 */
055@InterfaceAudience.Private
056public class QuotaObserverChore extends ScheduledChore {
057  private static final Logger LOG = LoggerFactory.getLogger(QuotaObserverChore.class);
058  static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY = "hbase.master.quotas.observer.chore.period";
059  static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minutes in millis
060
061  static final String QUOTA_OBSERVER_CHORE_DELAY_KEY = "hbase.master.quotas.observer.chore.delay";
062  static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
063
064  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
065    "hbase.master.quotas.observer.chore.timeunit";
066  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
067
068  static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
069    "hbase.master.quotas.observer.report.percent";
070  static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT = 0.95;
071
072  static final String REGION_REPORT_RETENTION_DURATION_KEY =
073    "hbase.master.quotas.region.report.retention.millis";
074  static final long REGION_REPORT_RETENTION_DURATION_DEFAULT = 1000 * 60 * 10; // 10 minutes
075
076  private final Connection conn;
077  private final Configuration conf;
078  private final MasterQuotaManager quotaManager;
079  private final MetricsMaster metrics;
080  /*
081   * Callback that changes in quota snapshots are passed to.
082   */
083  private final SpaceQuotaSnapshotNotifier snapshotNotifier;
084
085  /*
086   * Preserves the state of quota snapshots for tables and namespaces
087   */
088  private final Map<TableName, SpaceQuotaSnapshot> tableQuotaSnapshots;
089  private final Map<TableName, SpaceQuotaSnapshot> readOnlyTableQuotaSnapshots;
090  private final Map<String, SpaceQuotaSnapshot> namespaceQuotaSnapshots;
091  private final Map<String, SpaceQuotaSnapshot> readOnlyNamespaceSnapshots;
092
093  // The time, in millis, that region reports should be kept by the master
094  private final long regionReportLifetimeMillis;
095
096  /*
097   * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
098   */
099  private QuotaSnapshotStore<TableName> tableSnapshotStore;
100  private QuotaSnapshotStore<String> namespaceSnapshotStore;
101
102  public QuotaObserverChore(HMaster master, MetricsMaster metrics) {
103    this(master.getConnection(), master.getConfiguration(), master.getSpaceQuotaSnapshotNotifier(),
104      master.getMasterQuotaManager(), master, metrics);
105  }
106
107  QuotaObserverChore(Connection conn, Configuration conf,
108    SpaceQuotaSnapshotNotifier snapshotNotifier, MasterQuotaManager quotaManager, Stoppable stopper,
109    MetricsMaster metrics) {
110    super(QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), getInitialDelay(conf),
111      getTimeUnit(conf));
112    this.conn = conn;
113    this.conf = conf;
114    this.metrics = metrics;
115    this.quotaManager = quotaManager;
116    this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
117    this.tableQuotaSnapshots = new ConcurrentHashMap<>();
118    this.readOnlyTableQuotaSnapshots = Collections.unmodifiableMap(tableQuotaSnapshots);
119    this.namespaceQuotaSnapshots = new ConcurrentHashMap<>();
120    this.readOnlyNamespaceSnapshots = Collections.unmodifiableMap(namespaceQuotaSnapshots);
121    this.regionReportLifetimeMillis =
122      conf.getLong(REGION_REPORT_RETENTION_DURATION_KEY, REGION_REPORT_RETENTION_DURATION_DEFAULT);
123  }
124
125  @Override
126  protected void chore() {
127    try {
128      if (LOG.isTraceEnabled()) {
129        LOG.trace("Refreshing space quotas in RegionServer");
130      }
131      long start = System.nanoTime();
132      _chore();
133      if (metrics != null) {
134        metrics.incrementQuotaObserverTime((System.nanoTime() - start) / 1_000_000);
135      }
136    } catch (IOException e) {
137      LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
138    }
139  }
140
141  void _chore() throws IOException {
142    // Get the total set of tables that have quotas defined. Includes table quotas
143    // and tables included by namespace quotas.
144    TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
145    if (LOG.isTraceEnabled()) {
146      LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
147    }
148
149    if (metrics != null) {
150      // Set the number of namespaces and tables with quotas defined
151      metrics.setNumSpaceQuotas(tablesWithQuotas.getTableQuotaTables().size()
152        + tablesWithQuotas.getNamespacesWithQuotas().size());
153    }
154
155    // The current "view" of region space use. Used henceforth.
156    final Map<RegionInfo, Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
157    if (LOG.isTraceEnabled()) {
158      LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports: "
159        + reportedRegionSpaceUse);
160    }
161
162    // Remove the "old" region reports
163    pruneOldRegionReports();
164
165    // Create the stores to track table and namespace snapshots
166    initializeSnapshotStores(reportedRegionSpaceUse);
167    // Report the number of (non-expired) region size reports
168    if (metrics != null) {
169      metrics.setNumRegionSizeReports(reportedRegionSpaceUse.size());
170    }
171
172    // Filter out tables for which we don't have adequate regionspace reports yet.
173    // Important that we do this after we instantiate the stores above
174    // This gives us a set of Tables which may or may not be violating their quota.
175    // To be safe, we want to make sure that these are not in violation.
176    Set<TableName> tablesInLimbo =
177      tablesWithQuotas.filterInsufficientlyReportedTables(tableSnapshotStore);
178
179    if (LOG.isTraceEnabled()) {
180      LOG.trace("Filtered insufficiently reported tables, left with "
181        + reportedRegionSpaceUse.size() + " regions reported");
182    }
183
184    for (TableName tableInLimbo : tablesInLimbo) {
185      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
186      SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
187      if (currentStatus.isInViolation()) {
188        if (LOG.isTraceEnabled()) {
189          LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
190            + " reported than required.");
191        }
192        SpaceQuotaSnapshot targetSnapshot =
193          new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), currentSnapshot.getUsage(),
194            currentSnapshot.getLimit());
195        this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
196        // Update it in the Table QuotaStore so that memory is consistent with no violation.
197        tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
198        // In case of Disable SVP, we need to enable the table as it moves out of violation
199        if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy().orElse(null)) {
200          QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
201        }
202      }
203    }
204
205    // Transition each table to/from quota violation based on the current and target state.
206    // Only table quotas are enacted.
207    final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
208    processTablesWithQuotas(tablesWithTableQuotas);
209
210    // For each Namespace quota, transition each table in the namespace in or out of violation
211    // only if a table quota violation policy has not already been applied.
212    final Set<String> namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas();
213    final Multimap<String, TableName> tablesByNamespace = tablesWithQuotas.getTablesByNamespace();
214    processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
215  }
216
217  void initializeSnapshotStores(Map<RegionInfo, Long> regionSizes) {
218    Map<RegionInfo, Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
219    if (tableSnapshotStore == null) {
220      tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
221    } else {
222      tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
223    }
224    if (namespaceSnapshotStore == null) {
225      namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
226    } else {
227      namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
228    }
229  }
230
231  /**
232   * Processes each {@code TableName} which has a quota defined and moves it in or out of violation
233   * based on the space use.
234   * @param tablesWithTableQuotas The HBase tables which have quotas defined
235   */
236  void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
237    long numTablesInViolation = 0L;
238    for (TableName table : tablesWithTableQuotas) {
239      final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
240      if (spaceQuota == null) {
241        if (LOG.isDebugEnabled()) {
242          LOG.debug("Unexpectedly did not find a space quota for " + table
243            + ", maybe it was recently deleted.");
244        }
245        continue;
246      }
247      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
248      final SpaceQuotaSnapshot targetSnapshot =
249        tableSnapshotStore.getTargetState(table, spaceQuota);
250      if (LOG.isTraceEnabled()) {
251        LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target="
252          + targetSnapshot);
253      }
254      updateTableQuota(table, currentSnapshot, targetSnapshot);
255
256      if (targetSnapshot.getQuotaStatus().isInViolation()) {
257        numTablesInViolation++;
258      }
259    }
260    // Report the number of tables in violation
261    if (metrics != null) {
262      metrics.setNumTableInSpaceQuotaViolation(numTablesInViolation);
263    }
264  }
265
266  /**
267   * Processes each namespace which has a quota defined and moves all of the tables contained in
268   * that namespace into or out of violation of the quota. Tables which are already in violation of
269   * a quota at the table level which <em>also</em> have a reside in a namespace with a violated
270   * quota will not have the namespace quota enacted. The table quota takes priority over the
271   * namespace quota.
272   * @param namespacesWithQuotas The set of namespaces that have quotas defined
273   * @param tablesByNamespace    A mapping of namespaces and the tables contained in those
274   *                             namespaces
275   */
276  void processNamespacesWithQuotas(final Set<String> namespacesWithQuotas,
277    final Multimap<String, TableName> tablesByNamespace) throws IOException {
278    long numNamespacesInViolation = 0L;
279    for (String namespace : namespacesWithQuotas) {
280      // Get the quota definition for the namespace
281      final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
282      if (spaceQuota == null) {
283        if (LOG.isDebugEnabled()) {
284          LOG.debug("Could not get Namespace space quota for " + namespace
285            + ", maybe it was recently deleted.");
286        }
287        continue;
288      }
289      final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
290      final SpaceQuotaSnapshot targetSnapshot =
291        namespaceSnapshotStore.getTargetState(namespace, spaceQuota);
292      if (LOG.isTraceEnabled()) {
293        LOG.trace("Processing " + namespace + " with current=" + currentSnapshot + ", target="
294          + targetSnapshot);
295      }
296      updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
297
298      if (targetSnapshot.getQuotaStatus().isInViolation()) {
299        numNamespacesInViolation++;
300      }
301    }
302
303    // Report the number of namespaces in violation
304    if (metrics != null) {
305      metrics.setNumNamespacesInSpaceQuotaViolation(numNamespacesInViolation);
306    }
307  }
308
309  /**
310   * Updates the hbase:quota table with the new quota policy for this <code>table</code> if
311   * necessary.
312   * @param table           The table being checked
313   * @param currentSnapshot The state of the quota on this table from the previous invocation.
314   * @param targetSnapshot  The state the quota should be in for this table.
315   */
316  void updateTableQuota(TableName table, SpaceQuotaSnapshot currentSnapshot,
317    SpaceQuotaSnapshot targetSnapshot) throws IOException {
318    final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
319    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
320
321    // If we're changing something, log it.
322    if (!currentSnapshot.equals(targetSnapshot)) {
323      this.snapshotNotifier.transitionTable(table, targetSnapshot);
324      // Update it in memory
325      tableSnapshotStore.setCurrentState(table, targetSnapshot);
326
327      // If the target is none, we're moving out of violation. Update the hbase:quota table
328      SpaceViolationPolicy currPolicy = currentStatus.getPolicy().orElse(null);
329      SpaceViolationPolicy targetPolicy = targetStatus.getPolicy().orElse(null);
330      if (!targetStatus.isInViolation()) {
331        // In case of Disable SVP, we need to enable the table as it moves out of violation
332        if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
333          QuotaUtil.enableTableIfNotEnabled(conn, table);
334        }
335        if (LOG.isDebugEnabled()) {
336          LOG.debug(table + " moved into observance of table space quota.");
337        }
338      } else {
339        // We're either moving into violation or changing violation policies
340        if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
341          // In case of policy switch, we need to enable the table if current policy is Disable SVP
342          QuotaUtil.enableTableIfNotEnabled(conn, table);
343        } else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
344          // In case of Disable SVP, we need to disable the table as it moves into violation
345          QuotaUtil.disableTableIfNotDisabled(conn, table);
346        }
347        if (LOG.isDebugEnabled()) {
348          LOG.debug(
349            table + " moved into violation of table space quota with policy of " + targetPolicy);
350        }
351      }
352    } else if (LOG.isTraceEnabled()) {
353      // Policies are the same, so we have nothing to do except log this. Don't need to re-update
354      // the quota table
355      if (!currentStatus.isInViolation()) {
356        LOG.trace(table + " remains in observance of quota.");
357      } else {
358        LOG.trace(table + " remains in violation of quota.");
359      }
360    }
361  }
362
363  /**
364   * Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
365   * case, currPolicy or/and targetPolicy will be having DISABLE policy.
366   * @param currPolicy   currently set space violation policy
367   * @param targetPolicy new space violation policy
368   * @return true if is DISABLE space violation policy; otherwise false
369   */
370  private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
371    final SpaceViolationPolicy targetPolicy) {
372    return SpaceViolationPolicy.DISABLE == currPolicy
373      || SpaceViolationPolicy.DISABLE == targetPolicy;
374  }
375
376  /**
377   * Updates the hbase:quota table with the target quota policy for this <code>namespace</code> if
378   * necessary.
379   * @param namespace         The namespace being checked
380   * @param currentSnapshot   The state of the quota on this namespace from the previous invocation
381   * @param targetSnapshot    The state the quota should be in for this namespace
382   * @param tablesByNamespace A mapping of tables in namespaces.
383   */
384  void updateNamespaceQuota(String namespace, SpaceQuotaSnapshot currentSnapshot,
385    SpaceQuotaSnapshot targetSnapshot, final Multimap<String, TableName> tablesByNamespace)
386    throws IOException {
387    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
388
389    // When the policies differ, we need to move into or out of violation
390    if (!currentSnapshot.equals(targetSnapshot)) {
391      // We want to have a policy of "NONE", moving out of violation
392      if (!targetStatus.isInViolation()) {
393        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
394          // If there is a quota on this table in violation
395          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
396            // Table-level quota violation policy is being applied here.
397            if (LOG.isTraceEnabled()) {
398              LOG.trace("Not activating Namespace violation policy because a Table violation"
399                + " policy is already in effect for " + tableInNS);
400            }
401          } else {
402            LOG.info(tableInNS + " moving into observance of namespace space quota");
403            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
404          }
405        }
406        // We want to move into violation at the NS level
407      } else {
408        // Moving tables in the namespace into violation or to a different violation policy
409        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
410          final SpaceQuotaSnapshot tableQuotaSnapshot =
411            tableSnapshotStore.getCurrentState(tableInNS);
412          final boolean hasTableQuota =
413            !Objects.equals(QuotaSnapshotStore.NO_QUOTA, tableQuotaSnapshot);
414          if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
415            // Table-level quota violation policy is being applied here.
416            if (LOG.isTraceEnabled()) {
417              LOG.trace("Not activating Namespace violation policy because a Table violation"
418                + " policy is already in effect for " + tableInNS);
419            }
420          } else {
421            // No table quota present or a table quota present that is not in violation
422            LOG.info(tableInNS + " moving into violation of namespace space quota with policy "
423              + targetStatus.getPolicy());
424            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
425          }
426        }
427      }
428      // Update the new state in memory for this namespace
429      namespaceSnapshotStore.setCurrentState(namespace, targetSnapshot);
430    } else {
431      // Policies are the same
432      if (!targetStatus.isInViolation()) {
433        // Both are NONE, so we remain in observance
434        if (LOG.isTraceEnabled()) {
435          LOG.trace(namespace + " remains in observance of quota.");
436        }
437      } else {
438        // Namespace quota is still in violation, need to enact if the table quota is not
439        // taking priority.
440        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
441          // Does a table policy exist
442          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
443            // Table-level quota violation policy is being applied here.
444            if (LOG.isTraceEnabled()) {
445              LOG.trace("Not activating Namespace violation policy because Table violation"
446                + " policy is already in effect for " + tableInNS);
447            }
448          } else {
449            // No table policy, so enact namespace policy
450            LOG.info(tableInNS + " moving into violation of namespace space quota");
451            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
452          }
453        }
454      }
455    }
456  }
457
458  /**
459   * Removes region reports over a certain age.
460   */
461  void pruneOldRegionReports() {
462    final long now = EnvironmentEdgeManager.currentTime();
463    final long pruneTime = now - regionReportLifetimeMillis;
464    final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime, this);
465    if (LOG.isTraceEnabled()) {
466      LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
467        + pruneTime + ".");
468    }
469  }
470
471  /**
472   * Computes the set of all tables that have quotas defined. This includes tables with quotas
473   * explicitly set on them, in addition to tables that exist namespaces which have a quota defined.
474   */
475  TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
476    final Scan scan = QuotaTableUtil.makeScan(null);
477    final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
478    try (final QuotaRetriever scanner = new QuotaRetriever()) {
479      scanner.init(conn, scan);
480      for (QuotaSettings quotaSettings : scanner) {
481        // Only one of namespace and tablename should be 'null'
482        final String namespace = quotaSettings.getNamespace();
483        final TableName tableName = quotaSettings.getTableName();
484        if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
485          continue;
486        }
487
488        if (namespace != null) {
489          assert tableName == null;
490          // Collect all of the tables in the namespace
491          TableName[] tablesInNS = conn.getAdmin().listTableNamesByNamespace(namespace);
492          for (TableName tableUnderNs : tablesInNS) {
493            if (LOG.isTraceEnabled()) {
494              LOG.trace(
495                "Adding " + tableUnderNs + " under " + namespace + " as having a namespace quota");
496            }
497            tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
498          }
499        } else {
500          assert tableName != null;
501          if (LOG.isTraceEnabled()) {
502            LOG.trace("Adding " + tableName + " as having table quota.");
503          }
504          // namespace is already null, must be a non-null tableName
505          tablesWithQuotas.addTableQuotaTable(tableName);
506        }
507      }
508      return tablesWithQuotas;
509    }
510  }
511
512  QuotaSnapshotStore<TableName> getTableSnapshotStore() {
513    return tableSnapshotStore;
514  }
515
516  QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
517    return namespaceSnapshotStore;
518  }
519
520  /**
521   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects for each HBase
522   * table with a quota defined.
523   */
524  public Map<TableName, SpaceQuotaSnapshot> getTableQuotaSnapshots() {
525    return readOnlyTableQuotaSnapshots;
526  }
527
528  /**
529   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects for each HBase
530   * namespace with a quota defined.
531   */
532  public Map<String, SpaceQuotaSnapshot> getNamespaceQuotaSnapshots() {
533    return readOnlyNamespaceSnapshots;
534  }
535
536  /**
537   * Fetches the {@link SpaceQuotaSnapshot} for the given table.
538   */
539  SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
540    SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
541    if (state == null) {
542      // No tracked state implies observance.
543      return QuotaSnapshotStore.NO_QUOTA;
544    }
545    return state;
546  }
547
548  /**
549   * Stores the quota state for the given table.
550   */
551  void setTableQuotaSnapshot(TableName table, SpaceQuotaSnapshot snapshot) {
552    this.tableQuotaSnapshots.put(table, snapshot);
553  }
554
555  /**
556   * Fetches the {@link SpaceQuotaSnapshot} for the given namespace from this chore.
557   */
558  SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
559    SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
560    if (state == null) {
561      // No tracked state implies observance.
562      return QuotaSnapshotStore.NO_QUOTA;
563    }
564    return state;
565  }
566
567  /**
568   * Stores the given {@code snapshot} for the given {@code namespace} in this chore.
569   */
570  void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
571    this.namespaceQuotaSnapshots.put(namespace, snapshot);
572  }
573
574  /**
575   * Extracts the period for the chore from the configuration.
576   * @param conf The configuration object.
577   * @return The configured chore period or the default value in the given timeunit.
578   * @see #getTimeUnit(Configuration)
579   */
580  static int getPeriod(Configuration conf) {
581    return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY, QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
582  }
583
584  /**
585   * Extracts the initial delay for the chore from the configuration.
586   * @param conf The configuration object.
587   * @return The configured chore initial delay or the default value in the given timeunit.
588   * @see #getTimeUnit(Configuration)
589   */
590  static long getInitialDelay(Configuration conf) {
591    return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY, QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
592  }
593
594  /**
595   * Extracts the time unit for the chore period and initial delay from the configuration. The
596   * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to a
597   * {@link TimeUnit} value.
598   * @param conf The configuration object.
599   * @return The configured time unit for the chore period and initial delay or the default value.
600   */
601  static TimeUnit getTimeUnit(Configuration conf) {
602    return TimeUnit
603      .valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY, QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
604  }
605
606  /**
607   * Extracts the percent of Regions for a table to have been reported to enable quota violation
608   * state change.
609   * @param conf The configuration object.
610   * @return The percent of regions reported to use.
611   */
612  static Double getRegionReportPercent(Configuration conf) {
613    return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
614      QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
615  }
616
617  /**
618   * A container which encapsulates the tables that have either a table quota or are contained in a
619   * namespace which have a namespace quota.
620   */
621  static class TablesWithQuotas {
622    private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
623    private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
624    private final Connection conn;
625    private final Configuration conf;
626
627    public TablesWithQuotas(Connection conn, Configuration conf) {
628      this.conn = Objects.requireNonNull(conn);
629      this.conf = Objects.requireNonNull(conf);
630    }
631
632    Configuration getConfiguration() {
633      return conf;
634    }
635
636    /**
637     * Adds a table with a table quota.
638     */
639    public void addTableQuotaTable(TableName tn) {
640      tablesWithTableQuotas.add(tn);
641    }
642
643    /**
644     * Adds a table with a namespace quota.
645     */
646    public void addNamespaceQuotaTable(TableName tn) {
647      tablesWithNamespaceQuotas.add(tn);
648    }
649
650    /**
651     * Returns true if the given table has a table quota.
652     */
653    public boolean hasTableQuota(TableName tn) {
654      return tablesWithTableQuotas.contains(tn);
655    }
656
657    /**
658     * Returns true if the table exists in a namespace with a namespace quota.
659     */
660    public boolean hasNamespaceQuota(TableName tn) {
661      return tablesWithNamespaceQuotas.contains(tn);
662    }
663
664    /**
665     * Returns an unmodifiable view of all tables with table quotas.
666     */
667    public Set<TableName> getTableQuotaTables() {
668      return Collections.unmodifiableSet(tablesWithTableQuotas);
669    }
670
671    /**
672     * Returns an unmodifiable view of all tables in namespaces that have namespace quotas.
673     */
674    public Set<TableName> getNamespaceQuotaTables() {
675      return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
676    }
677
678    public Set<String> getNamespacesWithQuotas() {
679      Set<String> namespaces = new HashSet<>();
680      for (TableName tn : tablesWithNamespaceQuotas) {
681        namespaces.add(tn.getNamespaceAsString());
682      }
683      return namespaces;
684    }
685
686    /**
687     * Returns a view of all tables that reside in a namespace with a namespace quota, grouped by
688     * the namespace itself.
689     */
690    public Multimap<String, TableName> getTablesByNamespace() {
691      Multimap<String, TableName> tablesByNS = HashMultimap.create();
692      for (TableName tn : tablesWithNamespaceQuotas) {
693        tablesByNS.put(tn.getNamespaceAsString(), tn);
694      }
695      return tablesByNS;
696    }
697
698    /**
699     * Filters out all tables for which the Master currently doesn't have enough region space
700     * reports received from RegionServers yet.
701     */
702    public Set<TableName> filterInsufficientlyReportedTables(
703      QuotaSnapshotStore<TableName> tableStore) throws IOException {
704      final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
705      Set<TableName> tablesToRemove = new HashSet<>();
706      for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
707        // Don't recompute a table we've already computed
708        if (tablesToRemove.contains(table)) {
709          continue;
710        }
711        final int numRegionsInTable = getNumRegions(table);
712        // If the table doesn't exist (no regions), bail out.
713        if (numRegionsInTable == 0) {
714          if (LOG.isTraceEnabled()) {
715            LOG.trace("Filtering " + table + " because no regions were reported");
716          }
717          tablesToRemove.add(table);
718          continue;
719        }
720        final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore);
721        final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable;
722        if (ratioReported < percentRegionsReportedThreshold) {
723          if (LOG.isTraceEnabled()) {
724            LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota + " of "
725              + numRegionsInTable + " regions were reported.");
726          }
727          tablesToRemove.add(table);
728        } else if (LOG.isTraceEnabled()) {
729          LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota + " of "
730            + numRegionsInTable + " regions were reported.");
731        }
732      }
733      for (TableName tableToRemove : tablesToRemove) {
734        tablesWithTableQuotas.remove(tableToRemove);
735        tablesWithNamespaceQuotas.remove(tableToRemove);
736      }
737      return tablesToRemove;
738    }
739
740    /**
741     * Computes the total number of regions in a table.
742     */
743    int getNumRegions(TableName table) throws IOException {
744      List<RegionInfo> regions = this.conn.getAdmin().getRegions(table);
745      if (regions == null) {
746        return 0;
747      }
748      // Filter the region replicas if any and return the original number of regions for a table.
749      RegionReplicaUtil.removeNonDefaultRegions(regions);
750      return regions.size();
751    }
752
753    /**
754     * Computes the number of regions reported for a table.
755     */
756    int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
757      throws IOException {
758      return Iterables.size(tableStore.filterBySubject(table));
759    }
760
761    @Override
762    public String toString() {
763      final StringBuilder sb = new StringBuilder(32);
764      sb.append(getClass().getSimpleName()).append(": tablesWithTableQuotas=")
765        .append(this.tablesWithTableQuotas).append(", tablesWithNamespaceQuotas=")
766        .append(this.tablesWithNamespaceQuotas);
767      return sb.toString();
768    }
769  }
770}