001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.Objects;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicReference;
025import java.util.Map.Entry;
026
027import org.apache.hadoop.hbase.TableName;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
033import org.apache.hadoop.hbase.regionserver.RegionServerServices;
034
035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
036
037/**
038 * A manager for filesystem space quotas in the RegionServer.
039 *
040 * This class is the centralized point for what a RegionServer knows about space quotas
041 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
042 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
043 * being violated). Both of these are sensitive on when they were last updated. The
044 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
045 * the state on <code>this</code>.
046 */
047@InterfaceAudience.Private
048public class RegionServerSpaceQuotaManager {
049  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class);
050
051  private final RegionServerServices rsServices;
052
053  private SpaceQuotaRefresherChore spaceQuotaRefresher;
054  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
055  private boolean started = false;
056  private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
057  private SpaceViolationPolicyEnforcementFactory factory;
058
059  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
060    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
061  }
062
063  @VisibleForTesting
064  RegionServerSpaceQuotaManager(
065      RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
066    this.rsServices = Objects.requireNonNull(rsServices);
067    this.factory = factory;
068    this.enforcedPolicies = new ConcurrentHashMap<>();
069    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
070  }
071
072  public synchronized void start() throws IOException {
073    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
074      LOG.info("Quota support disabled, not starting space quota manager.");
075      return;
076    }
077
078    if (started) {
079      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
080      return;
081    }
082    this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
083    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
084    started = true;
085  }
086
087  public synchronized void stop() {
088    if (spaceQuotaRefresher != null) {
089      spaceQuotaRefresher.cancel();
090      spaceQuotaRefresher = null;
091    }
092    started = false;
093  }
094
095  /**
096   * @return if the {@code Chore} has been started.
097   */
098  public boolean isStarted() {
099    return started;
100  }
101
102  /**
103   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
104   * of what the RegionServer thinks the table's utilization is.
105   */
106  public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
107    return new HashMap<>(currentQuotaSnapshots.get());
108  }
109
110  /**
111   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
112   *
113   * @param newSnapshots The space quota snapshots.
114   */
115  public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
116    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
117  }
118
119  /**
120   * Creates an object well-suited for the RegionServer to use in verifying active policies.
121   */
122  public ActivePolicyEnforcement getActiveEnforcements() {
123    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
124  }
125
126  /**
127   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
128   * {@link SpaceViolationPolicy}s.
129   */
130  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
131    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
132        copyActiveEnforcements();
133    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
134    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
135      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
136      if (snapshot != null) {
137        policies.put(entry.getKey(), snapshot);
138      }
139    }
140    return policies;
141  }
142
143  /**
144   * Enforces the given violationPolicy on the given table in this RegionServer.
145   */
146  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
147    SpaceQuotaStatus status = snapshot.getQuotaStatus();
148    if (!status.isInViolation()) {
149      throw new IllegalStateException(
150          tableName + " is not in violation. Violation policy should not be enabled.");
151    }
152    if (LOG.isTraceEnabled()) {
153      LOG.trace(
154          "Enabling violation policy enforcement on " + tableName
155          + " with policy " + status.getPolicy());
156    }
157    // Construct this outside of the lock
158    final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
159        getRegionServerServices(), tableName, snapshot);
160    // "Enables" the policy
161    // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
162    // allow policy enable/disable on different tables to happen concurrently. As written now, only
163    // one table will be allowed to transition at a time. This is probably OK, but not sure if
164    // it would become a bottleneck at large clusters/number of tables.
165    synchronized (enforcedPolicies) {
166      try {
167        enforcement.enable();
168      } catch (IOException e) {
169        LOG.error("Failed to enable space violation policy for " + tableName
170            + ". This table will not enter violation.", e);
171        return;
172      }
173      enforcedPolicies.put(tableName, enforcement);
174    }
175  }
176
177  /**
178   * Disables enforcement on any violation policy on the given <code>tableName</code>.
179   */
180  public void disableViolationPolicyEnforcement(TableName tableName) {
181    if (LOG.isTraceEnabled()) {
182      LOG.trace("Disabling violation policy enforcement on " + tableName);
183    }
184    // "Disables" the policy
185    synchronized (enforcedPolicies) {
186      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
187      if (enforcement != null) {
188        try {
189          enforcement.disable();
190        } catch (IOException e) {
191          LOG.error("Failed to disable space violation policy for " + tableName
192              + ". This table will remain in violation.", e);
193          enforcedPolicies.put(tableName, enforcement);
194        }
195      }
196    }
197  }
198
199  /**
200   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
201   * a space quota violation policy. A convenience method.
202   *
203   * @param tableName The table to check
204   * @return True if compactions should be disabled for the table, false otherwise.
205   */
206  public boolean areCompactionsDisabled(TableName tableName) {
207    SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
208    if (enforcement != null) {
209      return enforcement.areCompactionsDisabled();
210    }
211    return false;
212  }
213
214  /**
215   * Returns the collection of tables which have quota violation policies enforced on
216   * this RegionServer.
217   */
218  Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
219    // Allows reads to happen concurrently (or while the map is being updated)
220    return new HashMap<>(this.enforcedPolicies);
221  }
222
223  RegionServerServices getRegionServerServices() {
224    return rsServices;
225  }
226
227  Connection getConnection() {
228    return rsServices.getConnection();
229  }
230
231  SpaceViolationPolicyEnforcementFactory getFactory() {
232    return factory;
233  }
234}