001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.Objects;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.Map.Entry;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
030import org.apache.hadoop.hbase.regionserver.RegionServerServices;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
036
037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
040
041/**
042 * A manager for filesystem space quotas in the RegionServer.
043 *
044 * This class is the centralized point for what a RegionServer knows about space quotas
045 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
046 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
047 * being violated). Both of these are sensitive on when they were last updated. The
048 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
049 * the state on <code>this</code>.
050 */
051@InterfaceAudience.Private
052public class RegionServerSpaceQuotaManager {
053  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class);
054
055  private final RegionServerServices rsServices;
056
057  private SpaceQuotaRefresherChore spaceQuotaRefresher;
058  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
059  private boolean started = false;
060  private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
061  private SpaceViolationPolicyEnforcementFactory factory;
062  private RegionSizeStore regionSizeStore;
063  private RegionSizeReportingChore regionSizeReporter;
064
065  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
066    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
067  }
068
069  RegionServerSpaceQuotaManager(
070      RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
071    this.rsServices = Objects.requireNonNull(rsServices);
072    this.factory = factory;
073    this.enforcedPolicies = new ConcurrentHashMap<>();
074    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
075    // Initialize the size store to not track anything -- create the real one if we're start()'ed
076    this.regionSizeStore = NoOpRegionSizeStore.getInstance();
077  }
078
079  public synchronized void start() throws IOException {
080    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
081      LOG.info("Quota support disabled, not starting space quota manager.");
082      return;
083    }
084
085    if (started) {
086      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
087      return;
088    }
089    // Start the chores
090    this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
091    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
092    this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
093    rsServices.getChoreService().scheduleChore(regionSizeReporter);
094    // Instantiate the real RegionSizeStore
095    this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
096    started = true;
097  }
098
099  public synchronized void stop() {
100    if (spaceQuotaRefresher != null) {
101      spaceQuotaRefresher.cancel();
102      spaceQuotaRefresher = null;
103    }
104    if (regionSizeReporter != null) {
105      regionSizeReporter.cancel();
106      regionSizeReporter = null;
107    }
108    started = false;
109  }
110
111  /**
112   * @return if the {@code Chore} has been started.
113   */
114  public boolean isStarted() {
115    return started;
116  }
117
118  /**
119   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
120   * of what the RegionServer thinks the table's utilization is.
121   */
122  public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
123    return new HashMap<>(currentQuotaSnapshots.get());
124  }
125
126  /**
127   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
128   *
129   * @param newSnapshots The space quota snapshots.
130   */
131  public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
132    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
133  }
134
135  /**
136   * Creates an object well-suited for the RegionServer to use in verifying active policies.
137   */
138  public ActivePolicyEnforcement getActiveEnforcements() {
139    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
140  }
141
142  /**
143   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
144   * {@link SpaceViolationPolicy}s.
145   */
146  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
147    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
148        copyActiveEnforcements();
149    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
150    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
151      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
152      if (snapshot != null) {
153        policies.put(entry.getKey(), snapshot);
154      }
155    }
156    return policies;
157  }
158
159  /**
160   * Enforces the given violationPolicy on the given table in this RegionServer.
161   */
162  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
163    SpaceQuotaStatus status = snapshot.getQuotaStatus();
164    if (!status.isInViolation()) {
165      throw new IllegalStateException(
166          tableName + " is not in violation. Violation policy should not be enabled.");
167    }
168    if (LOG.isTraceEnabled()) {
169      LOG.trace(
170          "Enabling violation policy enforcement on " + tableName
171          + " with policy " + status.getPolicy());
172    }
173    // Construct this outside of the lock
174    final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
175        getRegionServerServices(), tableName, snapshot);
176    // "Enables" the policy
177    // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
178    // allow policy enable/disable on different tables to happen concurrently. As written now, only
179    // one table will be allowed to transition at a time. This is probably OK, but not sure if
180    // it would become a bottleneck at large clusters/number of tables.
181    synchronized (enforcedPolicies) {
182      try {
183        enforcement.enable();
184      } catch (IOException e) {
185        LOG.error("Failed to enable space violation policy for " + tableName
186            + ". This table will not enter violation.", e);
187        return;
188      }
189      enforcedPolicies.put(tableName, enforcement);
190    }
191  }
192
193  /**
194   * Disables enforcement on any violation policy on the given <code>tableName</code>.
195   */
196  public void disableViolationPolicyEnforcement(TableName tableName) {
197    if (LOG.isTraceEnabled()) {
198      LOG.trace("Disabling violation policy enforcement on " + tableName);
199    }
200    // "Disables" the policy
201    synchronized (enforcedPolicies) {
202      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
203      if (enforcement != null) {
204        try {
205          enforcement.disable();
206        } catch (IOException e) {
207          LOG.error("Failed to disable space violation policy for " + tableName
208              + ". This table will remain in violation.", e);
209          enforcedPolicies.put(tableName, enforcement);
210        }
211      }
212    }
213  }
214
215  /**
216   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
217   * a space quota violation policy. A convenience method.
218   *
219   * @param tableName The table to check
220   * @return True if compactions should be disabled for the table, false otherwise.
221   */
222  public boolean areCompactionsDisabled(TableName tableName) {
223    SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
224    if (enforcement != null) {
225      return enforcement.areCompactionsDisabled();
226    }
227    return false;
228  }
229
230  /**
231   * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
232   *
233   * @return A {@link RegionSizeStore} implementation.
234   */
235  public RegionSizeStore getRegionSizeStore() {
236    return regionSizeStore;
237  }
238
239  /**
240   * Builds the protobuf message to inform the Master of files being archived.
241   *
242   * @param tn The table the files previously belonged to.
243   * @param archivedFiles The files and their size in bytes that were archived.
244   * @return The protobuf representation
245   */
246  public RegionServerStatusProtos.FileArchiveNotificationRequest buildFileArchiveRequest(
247      TableName tn, Collection<Entry<String,Long>> archivedFiles) {
248    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
249        RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
250    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
251    for (Entry<String,Long> archivedFile : archivedFiles) {
252      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws =
253          RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
254              .setName(archivedFile.getKey())
255              .setSize(archivedFile.getValue())
256              .setTableName(protoTn)
257              .build();
258      builder.addArchivedFiles(fws);
259    }
260    final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build();
261    if (LOG.isTraceEnabled()) {
262      LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request));
263    }
264    return request;
265  }
266
267  /**
268   * Returns the collection of tables which have quota violation policies enforced on
269   * this RegionServer.
270   */
271  Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
272    // Allows reads to happen concurrently (or while the map is being updated)
273    return new HashMap<>(this.enforcedPolicies);
274  }
275
276  RegionServerServices getRegionServerServices() {
277    return rsServices;
278  }
279
280  Connection getConnection() {
281    return rsServices.getConnection();
282  }
283
284  SpaceViolationPolicyEnforcementFactory getFactory() {
285    return factory;
286  }
287}