001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Map;
024import java.util.Objects;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ScheduledChore;
030import org.apache.hadoop.hbase.Stoppable;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionReplicaUtil;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.MetricsMaster;
038import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
045import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
046import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
049
050/**
051 * Reads the currently received Region filesystem-space use reports and acts on those which
052 * violate a defined quota.
053 */
054@InterfaceAudience.Private
055public class QuotaObserverChore extends ScheduledChore {
056  private static final Logger LOG = LoggerFactory.getLogger(QuotaObserverChore.class);
057  static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY =
058      "hbase.master.quotas.observer.chore.period";
059  static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minutes in millis
060
061  static final String QUOTA_OBSERVER_CHORE_DELAY_KEY =
062      "hbase.master.quotas.observer.chore.delay";
063  static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
064
065  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
066      "hbase.master.quotas.observer.chore.timeunit";
067  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
068
069  static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
070      "hbase.master.quotas.observer.report.percent";
071  static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
072
073  static final String REGION_REPORT_RETENTION_DURATION_KEY =
074      "hbase.master.quotas.region.report.retention.millis";
075  static final long REGION_REPORT_RETENTION_DURATION_DEFAULT =
076      1000 * 60 * 10; // 10 minutes
077
078  private final Connection conn;
079  private final Configuration conf;
080  private final MasterQuotaManager quotaManager;
081  private final MetricsMaster metrics;
082  /*
083   * Callback that changes in quota snapshots are passed to.
084   */
085  private final SpaceQuotaSnapshotNotifier snapshotNotifier;
086
087  /*
088   * Preserves the state of quota snapshots for tables and namespaces
089   */
090  private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots;
091  private final Map<TableName,SpaceQuotaSnapshot> readOnlyTableQuotaSnapshots;
092  private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots;
093  private final Map<String,SpaceQuotaSnapshot> readOnlyNamespaceSnapshots;
094
095  // The time, in millis, that region reports should be kept by the master
096  private final long regionReportLifetimeMillis;
097
098  /*
099   * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
100   */
101  private QuotaSnapshotStore<TableName> tableSnapshotStore;
102  private QuotaSnapshotStore<String> namespaceSnapshotStore;
103
104  public QuotaObserverChore(HMaster master, MetricsMaster metrics) {
105    this(
106        master.getConnection(), master.getConfiguration(),
107        master.getSpaceQuotaSnapshotNotifier(), master.getMasterQuotaManager(),
108        master, metrics);
109  }
110
111  QuotaObserverChore(
112      Connection conn, Configuration conf, SpaceQuotaSnapshotNotifier snapshotNotifier,
113      MasterQuotaManager quotaManager, Stoppable stopper, MetricsMaster metrics) {
114    super(
115        QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
116        getInitialDelay(conf), getTimeUnit(conf));
117    this.conn = conn;
118    this.conf = conf;
119    this.metrics = metrics;
120    this.quotaManager = quotaManager;
121    this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
122    this.tableQuotaSnapshots = new ConcurrentHashMap<>();
123    this.readOnlyTableQuotaSnapshots = Collections.unmodifiableMap(tableQuotaSnapshots);
124    this.namespaceQuotaSnapshots = new ConcurrentHashMap<>();
125    this.readOnlyNamespaceSnapshots = Collections.unmodifiableMap(namespaceQuotaSnapshots);
126    this.regionReportLifetimeMillis = conf.getLong(
127        REGION_REPORT_RETENTION_DURATION_KEY, REGION_REPORT_RETENTION_DURATION_DEFAULT);
128  }
129
130  @Override
131  protected void chore() {
132    try {
133      if (LOG.isTraceEnabled()) {
134        LOG.trace("Refreshing space quotas in RegionServer");
135      }
136      long start = System.nanoTime();
137      _chore();
138      if (metrics != null) {
139        metrics.incrementQuotaObserverTime((System.nanoTime() - start) / 1_000_000);
140      }
141    } catch (IOException e) {
142      LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
143    }
144  }
145
146  void _chore() throws IOException {
147    // Get the total set of tables that have quotas defined. Includes table quotas
148    // and tables included by namespace quotas.
149    TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
150    if (LOG.isTraceEnabled()) {
151      LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
152    }
153
154    if (metrics != null) {
155      // Set the number of namespaces and tables with quotas defined
156      metrics.setNumSpaceQuotas(tablesWithQuotas.getTableQuotaTables().size()
157          + tablesWithQuotas.getNamespacesWithQuotas().size());
158    }
159
160    // The current "view" of region space use. Used henceforth.
161    final Map<RegionInfo,Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
162    if (LOG.isTraceEnabled()) {
163      LOG.trace(
164          "Using " + reportedRegionSpaceUse.size() + " region space use reports: " +
165          reportedRegionSpaceUse);
166    }
167
168    // Remove the "old" region reports
169    pruneOldRegionReports();
170
171    // Create the stores to track table and namespace snapshots
172    initializeSnapshotStores(reportedRegionSpaceUse);
173    // Report the number of (non-expired) region size reports
174    if (metrics != null) {
175      metrics.setNumRegionSizeReports(reportedRegionSpaceUse.size());
176    }
177
178    // Filter out tables for which we don't have adequate regionspace reports yet.
179    // Important that we do this after we instantiate the stores above
180    // This gives us a set of Tables which may or may not be violating their quota.
181    // To be safe, we want to make sure that these are not in violation.
182    Set<TableName> tablesInLimbo = tablesWithQuotas.filterInsufficientlyReportedTables(
183        tableSnapshotStore);
184
185    if (LOG.isTraceEnabled()) {
186      LOG.trace("Filtered insufficiently reported tables, left with " +
187          reportedRegionSpaceUse.size() + " regions reported");
188    }
189
190    for (TableName tableInLimbo : tablesInLimbo) {
191      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
192      SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
193      if (currentStatus.isInViolation()) {
194        if (LOG.isTraceEnabled()) {
195          LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
196              + " reported than required.");
197        }
198        SpaceQuotaSnapshot targetSnapshot = new SpaceQuotaSnapshot(
199            SpaceQuotaStatus.notInViolation(), currentSnapshot.getUsage(),
200            currentSnapshot.getLimit());
201        this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
202        // Update it in the Table QuotaStore so that memory is consistent with no violation.
203        tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
204        // In case of Disable SVP, we need to enable the table as it moves out of violation
205        if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy().orElse(null)) {
206          QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
207        }
208      }
209    }
210
211    // Transition each table to/from quota violation based on the current and target state.
212    // Only table quotas are enacted.
213    final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
214    processTablesWithQuotas(tablesWithTableQuotas);
215
216    // For each Namespace quota, transition each table in the namespace in or out of violation
217    // only if a table quota violation policy has not already been applied.
218    final Set<String> namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas();
219    final Multimap<String,TableName> tablesByNamespace = tablesWithQuotas.getTablesByNamespace();
220    processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
221  }
222
223  void initializeSnapshotStores(Map<RegionInfo,Long> regionSizes) {
224    Map<RegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
225    if (tableSnapshotStore == null) {
226      tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
227    } else {
228      tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
229    }
230    if (namespaceSnapshotStore == null) {
231      namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(
232          conn, this, immutableRegionSpaceUse);
233    } else {
234      namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
235    }
236  }
237
238  /**
239   * Processes each {@code TableName} which has a quota defined and moves it in or out of
240   * violation based on the space use.
241   *
242   * @param tablesWithTableQuotas The HBase tables which have quotas defined
243   */
244  void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
245    long numTablesInViolation = 0L;
246    for (TableName table : tablesWithTableQuotas) {
247      final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
248      if (spaceQuota == null) {
249        if (LOG.isDebugEnabled()) {
250          LOG.debug("Unexpectedly did not find a space quota for " + table
251              + ", maybe it was recently deleted.");
252        }
253        continue;
254      }
255      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
256      final SpaceQuotaSnapshot targetSnapshot = tableSnapshotStore.getTargetState(table, spaceQuota);
257      if (LOG.isTraceEnabled()) {
258        LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target="
259            + targetSnapshot);
260      }
261      updateTableQuota(table, currentSnapshot, targetSnapshot);
262
263      if (targetSnapshot.getQuotaStatus().isInViolation()) {
264        numTablesInViolation++;
265      }
266    }
267    // Report the number of tables in violation
268    if (metrics != null) {
269      metrics.setNumTableInSpaceQuotaViolation(numTablesInViolation);
270    }
271  }
272
273  /**
274   * Processes each namespace which has a quota defined and moves all of the tables contained
275   * in that namespace into or out of violation of the quota. Tables which are already in
276   * violation of a quota at the table level which <em>also</em> have a reside in a namespace
277   * with a violated quota will not have the namespace quota enacted. The table quota takes
278   * priority over the namespace quota.
279   *
280   * @param namespacesWithQuotas The set of namespaces that have quotas defined
281   * @param tablesByNamespace A mapping of namespaces and the tables contained in those namespaces
282   */
283  void processNamespacesWithQuotas(
284      final Set<String> namespacesWithQuotas,
285      final Multimap<String,TableName> tablesByNamespace) throws IOException {
286    long numNamespacesInViolation = 0L;
287    for (String namespace : namespacesWithQuotas) {
288      // Get the quota definition for the namespace
289      final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
290      if (spaceQuota == null) {
291        if (LOG.isDebugEnabled()) {
292          LOG.debug("Could not get Namespace space quota for " + namespace
293              + ", maybe it was recently deleted.");
294        }
295        continue;
296      }
297      final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
298      final SpaceQuotaSnapshot targetSnapshot = namespaceSnapshotStore.getTargetState(
299          namespace, spaceQuota);
300      if (LOG.isTraceEnabled()) {
301        LOG.trace("Processing " + namespace + " with current=" + currentSnapshot + ", target="
302            + targetSnapshot);
303      }
304      updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
305
306      if (targetSnapshot.getQuotaStatus().isInViolation()) {
307        numNamespacesInViolation++;
308      }
309    }
310
311    // Report the number of namespaces in violation
312    if (metrics != null) {
313      metrics.setNumNamespacesInSpaceQuotaViolation(numNamespacesInViolation);
314    }
315  }
316
317  /**
318   * Updates the hbase:quota table with the new quota policy for this <code>table</code>
319   * if necessary.
320   *
321   * @param table The table being checked
322   * @param currentSnapshot The state of the quota on this table from the previous invocation.
323   * @param targetSnapshot The state the quota should be in for this table.
324   */
325  void updateTableQuota(
326      TableName table, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot)
327          throws IOException {
328    final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
329    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
330
331    // If we're changing something, log it.
332    if (!currentSnapshot.equals(targetSnapshot)) {
333      this.snapshotNotifier.transitionTable(table, targetSnapshot);
334      // Update it in memory
335      tableSnapshotStore.setCurrentState(table, targetSnapshot);
336
337      // If the target is none, we're moving out of violation. Update the hbase:quota table
338      SpaceViolationPolicy currPolicy = currentStatus.getPolicy().orElse(null);
339      SpaceViolationPolicy targetPolicy = targetStatus.getPolicy().orElse(null);
340      if (!targetStatus.isInViolation()) {
341        // In case of Disable SVP, we need to enable the table as it moves out of violation
342        if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
343          QuotaUtil.enableTableIfNotEnabled(conn, table);
344        }
345        if (LOG.isDebugEnabled()) {
346          LOG.debug(table + " moved into observance of table space quota.");
347        }
348      } else {
349        // We're either moving into violation or changing violation policies
350        if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
351          // In case of policy switch, we need to enable the table if current policy is Disable SVP
352          QuotaUtil.enableTableIfNotEnabled(conn, table);
353        } else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
354          // In case of Disable SVP, we need to disable the table as it moves into violation
355          QuotaUtil.disableTableIfNotDisabled(conn, table);
356        }
357        if (LOG.isDebugEnabled()) {
358          LOG.debug(
359            table + " moved into violation of table space quota with policy of " + targetPolicy);
360        }
361      }
362    } else if (LOG.isTraceEnabled()) {
363      // Policies are the same, so we have nothing to do except log this. Don't need to re-update
364      // the quota table
365      if (!currentStatus.isInViolation()) {
366        LOG.trace(table + " remains in observance of quota.");
367      } else {
368        LOG.trace(table + " remains in violation of quota.");
369      }
370    }
371  }
372
373  /**
374   * Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
375   * case, currPolicy or/and targetPolicy will be having DISABLE policy.
376   * @param currPolicy currently set space violation policy
377   * @param targetPolicy new space violation policy
378   * @return true if is DISABLE space violation policy; otherwise false
379   */
380  private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
381      final SpaceViolationPolicy targetPolicy) {
382    return SpaceViolationPolicy.DISABLE == currPolicy
383        || SpaceViolationPolicy.DISABLE == targetPolicy;
384  }
385
386  /**
387   * Updates the hbase:quota table with the target quota policy for this <code>namespace</code>
388   * if necessary.
389   *
390   * @param namespace The namespace being checked
391   * @param currentSnapshot The state of the quota on this namespace from the previous invocation
392   * @param targetSnapshot The state the quota should be in for this namespace
393   * @param tablesByNamespace A mapping of tables in namespaces.
394   */
395  void updateNamespaceQuota(
396      String namespace, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot,
397      final Multimap<String,TableName> tablesByNamespace) throws IOException {
398    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
399
400    // When the policies differ, we need to move into or out of violation
401    if (!currentSnapshot.equals(targetSnapshot)) {
402      // We want to have a policy of "NONE", moving out of violation
403      if (!targetStatus.isInViolation()) {
404        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
405          // If there is a quota on this table in violation
406          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
407            // Table-level quota violation policy is being applied here.
408            if (LOG.isTraceEnabled()) {
409              LOG.trace("Not activating Namespace violation policy because a Table violation"
410                  + " policy is already in effect for " + tableInNS);
411            }
412          } else {
413            LOG.info(tableInNS + " moving into observance of namespace space quota");
414            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
415          }
416        }
417      // We want to move into violation at the NS level
418      } else {
419        // Moving tables in the namespace into violation or to a different violation policy
420        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
421          final SpaceQuotaSnapshot tableQuotaSnapshot =
422                tableSnapshotStore.getCurrentState(tableInNS);
423          final boolean hasTableQuota =
424              !Objects.equals(QuotaSnapshotStore.NO_QUOTA, tableQuotaSnapshot);
425          if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
426            // Table-level quota violation policy is being applied here.
427            if (LOG.isTraceEnabled()) {
428              LOG.trace("Not activating Namespace violation policy because a Table violation"
429                  + " policy is already in effect for " + tableInNS);
430            }
431          } else {
432            // No table quota present or a table quota present that is not in violation
433            LOG.info(tableInNS + " moving into violation of namespace space quota with policy "
434                + targetStatus.getPolicy());
435            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
436          }
437        }
438      }
439      // Update the new state in memory for this namespace
440      namespaceSnapshotStore.setCurrentState(namespace, targetSnapshot);
441    } else {
442      // Policies are the same
443      if (!targetStatus.isInViolation()) {
444        // Both are NONE, so we remain in observance
445        if (LOG.isTraceEnabled()) {
446          LOG.trace(namespace + " remains in observance of quota.");
447        }
448      } else {
449        // Namespace quota is still in violation, need to enact if the table quota is not
450        // taking priority.
451        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
452          // Does a table policy exist
453          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
454            // Table-level quota violation policy is being applied here.
455            if (LOG.isTraceEnabled()) {
456              LOG.trace("Not activating Namespace violation policy because Table violation"
457                  + " policy is already in effect for " + tableInNS);
458            }
459          } else {
460            // No table policy, so enact namespace policy
461            LOG.info(tableInNS + " moving into violation of namespace space quota");
462            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
463          }
464        }
465      }
466    }
467  }
468
469  /**
470   * Removes region reports over a certain age.
471   */
472  void pruneOldRegionReports() {
473    final long now = EnvironmentEdgeManager.currentTime();
474    final long pruneTime = now - regionReportLifetimeMillis;
475    final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime,this);
476    if (LOG.isTraceEnabled()) {
477      LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
478          + pruneTime + ".");
479    }
480  }
481
482  /**
483   * Computes the set of all tables that have quotas defined. This includes tables with quotas
484   * explicitly set on them, in addition to tables that exist namespaces which have a quota
485   * defined.
486   */
487  TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
488    final Scan scan = QuotaTableUtil.makeScan(null);
489    final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
490    try (final QuotaRetriever scanner = new QuotaRetriever()) {
491      scanner.init(conn, scan);
492      for (QuotaSettings quotaSettings : scanner) {
493        // Only one of namespace and tablename should be 'null'
494        final String namespace = quotaSettings.getNamespace();
495        final TableName tableName = quotaSettings.getTableName();
496        if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
497          continue;
498        }
499
500        if (namespace != null) {
501          assert tableName == null;
502          // Collect all of the tables in the namespace
503          TableName[] tablesInNS = conn.getAdmin().listTableNamesByNamespace(namespace);
504          for (TableName tableUnderNs : tablesInNS) {
505            if (LOG.isTraceEnabled()) {
506              LOG.trace("Adding " + tableUnderNs + " under " +  namespace
507                  + " as having a namespace quota");
508            }
509            tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
510          }
511        } else {
512          assert tableName != null;
513          if (LOG.isTraceEnabled()) {
514            LOG.trace("Adding " + tableName + " as having table quota.");
515          }
516          // namespace is already null, must be a non-null tableName
517          tablesWithQuotas.addTableQuotaTable(tableName);
518        }
519      }
520      return tablesWithQuotas;
521    }
522  }
523
524  QuotaSnapshotStore<TableName> getTableSnapshotStore() {
525    return tableSnapshotStore;
526  }
527
528  QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
529    return namespaceSnapshotStore;
530  }
531
532  /**
533   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects
534   * for each HBase table with a quota defined.
535   */
536  public Map<TableName,SpaceQuotaSnapshot> getTableQuotaSnapshots() {
537    return readOnlyTableQuotaSnapshots;
538  }
539
540  /**
541   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects
542   * for each HBase namespace with a quota defined.
543   */
544  public Map<String,SpaceQuotaSnapshot> getNamespaceQuotaSnapshots() {
545    return readOnlyNamespaceSnapshots;
546  }
547
548  /**
549   * Fetches the {@link SpaceQuotaSnapshot} for the given table.
550   */
551  SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
552    SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
553    if (state == null) {
554      // No tracked state implies observance.
555      return QuotaSnapshotStore.NO_QUOTA;
556    }
557    return state;
558  }
559
560  /**
561   * Stores the quota state for the given table.
562   */
563  void setTableQuotaSnapshot(TableName table, SpaceQuotaSnapshot snapshot) {
564    this.tableQuotaSnapshots.put(table, snapshot);
565  }
566
567  /**
568   * Fetches the {@link SpaceQuotaSnapshot} for the given namespace from this chore.
569   */
570  SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
571    SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
572    if (state == null) {
573      // No tracked state implies observance.
574      return QuotaSnapshotStore.NO_QUOTA;
575    }
576    return state;
577  }
578
579  /**
580   * Stores the given {@code snapshot} for the given {@code namespace} in this chore.
581   */
582  void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
583    this.namespaceQuotaSnapshots.put(namespace, snapshot);
584  }
585
586  /**
587   * Extracts the period for the chore from the configuration.
588   *
589   * @param conf The configuration object.
590   * @return The configured chore period or the default value in the given timeunit.
591   * @see #getTimeUnit(Configuration)
592   */
593  static int getPeriod(Configuration conf) {
594    return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY,
595        QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
596  }
597
598  /**
599   * Extracts the initial delay for the chore from the configuration.
600   *
601   * @param conf The configuration object.
602   * @return The configured chore initial delay or the default value in the given timeunit.
603   * @see #getTimeUnit(Configuration)
604   */
605  static long getInitialDelay(Configuration conf) {
606    return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY,
607        QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
608  }
609
610  /**
611   * Extracts the time unit for the chore period and initial delay from the configuration. The
612   * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to
613   * a {@link TimeUnit} value.
614   *
615   * @param conf The configuration object.
616   * @return The configured time unit for the chore period and initial delay or the default value.
617   */
618  static TimeUnit getTimeUnit(Configuration conf) {
619    return TimeUnit.valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY,
620        QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
621  }
622
623  /**
624   * Extracts the percent of Regions for a table to have been reported to enable quota violation
625   * state change.
626   *
627   * @param conf The configuration object.
628   * @return The percent of regions reported to use.
629   */
630  static Double getRegionReportPercent(Configuration conf) {
631    return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
632        QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
633  }
634
635  /**
636   * A container which encapsulates the tables that have either a table quota or are contained in a
637   * namespace which have a namespace quota.
638   */
639  static class TablesWithQuotas {
640    private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
641    private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
642    private final Connection conn;
643    private final Configuration conf;
644
645    public TablesWithQuotas(Connection conn, Configuration conf) {
646      this.conn = Objects.requireNonNull(conn);
647      this.conf = Objects.requireNonNull(conf);
648    }
649
650    Configuration getConfiguration() {
651      return conf;
652    }
653
654    /**
655     * Adds a table with a table quota.
656     */
657    public void addTableQuotaTable(TableName tn) {
658      tablesWithTableQuotas.add(tn);
659    }
660
661    /**
662     * Adds a table with a namespace quota.
663     */
664    public void addNamespaceQuotaTable(TableName tn) {
665      tablesWithNamespaceQuotas.add(tn);
666    }
667
668    /**
669     * Returns true if the given table has a table quota.
670     */
671    public boolean hasTableQuota(TableName tn) {
672      return tablesWithTableQuotas.contains(tn);
673    }
674
675    /**
676     * Returns true if the table exists in a namespace with a namespace quota.
677     */
678    public boolean hasNamespaceQuota(TableName tn) {
679      return tablesWithNamespaceQuotas.contains(tn);
680    }
681
682    /**
683     * Returns an unmodifiable view of all tables with table quotas.
684     */
685    public Set<TableName> getTableQuotaTables() {
686      return Collections.unmodifiableSet(tablesWithTableQuotas);
687    }
688
689    /**
690     * Returns an unmodifiable view of all tables in namespaces that have
691     * namespace quotas.
692     */
693    public Set<TableName> getNamespaceQuotaTables() {
694      return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
695    }
696
697    public Set<String> getNamespacesWithQuotas() {
698      Set<String> namespaces = new HashSet<>();
699      for (TableName tn : tablesWithNamespaceQuotas) {
700        namespaces.add(tn.getNamespaceAsString());
701      }
702      return namespaces;
703    }
704
705    /**
706     * Returns a view of all tables that reside in a namespace with a namespace
707     * quota, grouped by the namespace itself.
708     */
709    public Multimap<String,TableName> getTablesByNamespace() {
710      Multimap<String,TableName> tablesByNS = HashMultimap.create();
711      for (TableName tn : tablesWithNamespaceQuotas) {
712        tablesByNS.put(tn.getNamespaceAsString(), tn);
713      }
714      return tablesByNS;
715    }
716
717    /**
718     * Filters out all tables for which the Master currently doesn't have enough region space
719     * reports received from RegionServers yet.
720     */
721    public Set<TableName> filterInsufficientlyReportedTables(
722        QuotaSnapshotStore<TableName> tableStore) throws IOException {
723      final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
724      Set<TableName> tablesToRemove = new HashSet<>();
725      for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
726        // Don't recompute a table we've already computed
727        if (tablesToRemove.contains(table)) {
728          continue;
729        }
730        final int numRegionsInTable = getNumRegions(table);
731        // If the table doesn't exist (no regions), bail out.
732        if (numRegionsInTable == 0) {
733          if (LOG.isTraceEnabled()) {
734            LOG.trace("Filtering " + table + " because no regions were reported");
735          }
736          tablesToRemove.add(table);
737          continue;
738        }
739        final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore);
740        final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable;
741        if (ratioReported < percentRegionsReportedThreshold) {
742          if (LOG.isTraceEnabled()) {
743            LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota  + " of " +
744                numRegionsInTable + " regions were reported.");
745          }
746          tablesToRemove.add(table);
747        } else if (LOG.isTraceEnabled()) {
748          LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota  + " of " +
749              numRegionsInTable + " regions were reported.");
750        }
751      }
752      for (TableName tableToRemove : tablesToRemove) {
753        tablesWithTableQuotas.remove(tableToRemove);
754        tablesWithNamespaceQuotas.remove(tableToRemove);
755      }
756      return tablesToRemove;
757    }
758
759    /**
760     * Computes the total number of regions in a table.
761     */
762    int getNumRegions(TableName table) throws IOException {
763      List<RegionInfo> regions = this.conn.getAdmin().getRegions(table);
764      if (regions == null) {
765        return 0;
766      }
767      // Filter the region replicas if any and return the original number of regions for a table.
768      RegionReplicaUtil.removeNonDefaultRegions(regions);
769      return regions.size();
770    }
771
772    /**
773     * Computes the number of regions reported for a table.
774     */
775    int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
776        throws IOException {
777      return Iterables.size(tableStore.filterBySubject(table));
778    }
779
780    @Override
781    public String toString() {
782      final StringBuilder sb = new StringBuilder(32);
783      sb.append(getClass().getSimpleName())
784          .append(": tablesWithTableQuotas=")
785          .append(this.tablesWithTableQuotas)
786          .append(", tablesWithNamespaceQuotas=")
787          .append(this.tablesWithNamespaceQuotas);
788      return sb.toString();
789    }
790  }
791}