001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.Objects;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.Map.Entry;
027
028import org.apache.hadoop.hbase.TableName;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
034import org.apache.hadoop.hbase.regionserver.RegionServerServices;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
038
039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
042
043/**
044 * A manager for filesystem space quotas in the RegionServer.
045 *
046 * This class is the centralized point for what a RegionServer knows about space quotas
047 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
048 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
049 * being violated). Both of these are sensitive on when they were last updated. The
050 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
051 * the state on <code>this</code>.
052 */
053@InterfaceAudience.Private
054public class RegionServerSpaceQuotaManager {
055  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class);
056
057  private final RegionServerServices rsServices;
058
059  private SpaceQuotaRefresherChore spaceQuotaRefresher;
060  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
061  private boolean started = false;
062  private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
063  private SpaceViolationPolicyEnforcementFactory factory;
064  private RegionSizeStore regionSizeStore;
065  private RegionSizeReportingChore regionSizeReporter;
066
067  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
068    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
069  }
070
071  @VisibleForTesting
072  RegionServerSpaceQuotaManager(
073      RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
074    this.rsServices = Objects.requireNonNull(rsServices);
075    this.factory = factory;
076    this.enforcedPolicies = new ConcurrentHashMap<>();
077    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
078    // Initialize the size store to not track anything -- create the real one if we're start()'ed
079    this.regionSizeStore = NoOpRegionSizeStore.getInstance();
080  }
081
082  public synchronized void start() throws IOException {
083    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
084      LOG.info("Quota support disabled, not starting space quota manager.");
085      return;
086    }
087
088    if (started) {
089      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
090      return;
091    }
092    // Start the chores
093    this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
094    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
095    this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
096    rsServices.getChoreService().scheduleChore(regionSizeReporter);
097    // Instantiate the real RegionSizeStore
098    this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
099    started = true;
100  }
101
102  public synchronized void stop() {
103    if (spaceQuotaRefresher != null) {
104      spaceQuotaRefresher.cancel();
105      spaceQuotaRefresher = null;
106    }
107    if (regionSizeReporter != null) {
108      regionSizeReporter.cancel();
109      regionSizeReporter = null;
110    }
111    started = false;
112  }
113
114  /**
115   * @return if the {@code Chore} has been started.
116   */
117  public boolean isStarted() {
118    return started;
119  }
120
121  /**
122   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
123   * of what the RegionServer thinks the table's utilization is.
124   */
125  public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
126    return new HashMap<>(currentQuotaSnapshots.get());
127  }
128
129  /**
130   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
131   *
132   * @param newSnapshots The space quota snapshots.
133   */
134  public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
135    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
136  }
137
138  /**
139   * Creates an object well-suited for the RegionServer to use in verifying active policies.
140   */
141  public ActivePolicyEnforcement getActiveEnforcements() {
142    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
143  }
144
145  /**
146   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
147   * {@link SpaceViolationPolicy}s.
148   */
149  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
150    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
151        copyActiveEnforcements();
152    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
153    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
154      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
155      if (snapshot != null) {
156        policies.put(entry.getKey(), snapshot);
157      }
158    }
159    return policies;
160  }
161
162  /**
163   * Enforces the given violationPolicy on the given table in this RegionServer.
164   */
165  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
166    SpaceQuotaStatus status = snapshot.getQuotaStatus();
167    if (!status.isInViolation()) {
168      throw new IllegalStateException(
169          tableName + " is not in violation. Violation policy should not be enabled.");
170    }
171    if (LOG.isTraceEnabled()) {
172      LOG.trace(
173          "Enabling violation policy enforcement on " + tableName
174          + " with policy " + status.getPolicy());
175    }
176    // Construct this outside of the lock
177    final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
178        getRegionServerServices(), tableName, snapshot);
179    // "Enables" the policy
180    // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
181    // allow policy enable/disable on different tables to happen concurrently. As written now, only
182    // one table will be allowed to transition at a time. This is probably OK, but not sure if
183    // it would become a bottleneck at large clusters/number of tables.
184    synchronized (enforcedPolicies) {
185      try {
186        enforcement.enable();
187      } catch (IOException e) {
188        LOG.error("Failed to enable space violation policy for " + tableName
189            + ". This table will not enter violation.", e);
190        return;
191      }
192      enforcedPolicies.put(tableName, enforcement);
193    }
194  }
195
196  /**
197   * Disables enforcement on any violation policy on the given <code>tableName</code>.
198   */
199  public void disableViolationPolicyEnforcement(TableName tableName) {
200    if (LOG.isTraceEnabled()) {
201      LOG.trace("Disabling violation policy enforcement on " + tableName);
202    }
203    // "Disables" the policy
204    synchronized (enforcedPolicies) {
205      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
206      if (enforcement != null) {
207        try {
208          enforcement.disable();
209        } catch (IOException e) {
210          LOG.error("Failed to disable space violation policy for " + tableName
211              + ". This table will remain in violation.", e);
212          enforcedPolicies.put(tableName, enforcement);
213        }
214      }
215    }
216  }
217
218  /**
219   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
220   * a space quota violation policy. A convenience method.
221   *
222   * @param tableName The table to check
223   * @return True if compactions should be disabled for the table, false otherwise.
224   */
225  public boolean areCompactionsDisabled(TableName tableName) {
226    SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
227    if (enforcement != null) {
228      return enforcement.areCompactionsDisabled();
229    }
230    return false;
231  }
232
233  /**
234   * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
235   *
236   * @return A {@link RegionSizeStore} implementation.
237   */
238  public RegionSizeStore getRegionSizeStore() {
239    return regionSizeStore;
240  }
241
242  /**
243   * Builds the protobuf message to inform the Master of files being archived.
244   *
245   * @param tn The table the files previously belonged to.
246   * @param archivedFiles The files and their size in bytes that were archived.
247   * @return The protobuf representation
248   */
249  public RegionServerStatusProtos.FileArchiveNotificationRequest buildFileArchiveRequest(
250      TableName tn, Collection<Entry<String,Long>> archivedFiles) {
251    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
252        RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
253    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
254    for (Entry<String,Long> archivedFile : archivedFiles) {
255      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws =
256          RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
257              .setName(archivedFile.getKey())
258              .setSize(archivedFile.getValue())
259              .setTableName(protoTn)
260              .build();
261      builder.addArchivedFiles(fws);
262    }
263    final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build();
264    if (LOG.isTraceEnabled()) {
265      LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request));
266    }
267    return request;
268  }
269
270  /**
271   * Returns the collection of tables which have quota violation policies enforced on
272   * this RegionServer.
273   */
274  Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
275    // Allows reads to happen concurrently (or while the map is being updated)
276    return new HashMap<>(this.enforcedPolicies);
277  }
278
279  RegionServerServices getRegionServerServices() {
280    return rsServices;
281  }
282
283  Connection getConnection() {
284    return rsServices.getConnection();
285  }
286
287  SpaceViolationPolicyEnforcementFactory getFactory() {
288    return factory;
289  }
290}