001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.Objects;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
031import org.apache.hadoop.hbase.regionserver.RegionServerServices;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
037
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
041
042/**
043 * A manager for filesystem space quotas in the RegionServer. This class is the centralized point
044 * for what a RegionServer knows about space quotas on tables. For each table, it tracks two
045 * different things: the {@link SpaceQuotaSnapshot} and a {@link SpaceViolationPolicyEnforcement}
046 * (which may be null when a quota is not being violated). Both of these are sensitive on when they
047 * were last updated. The {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and
048 * updates the state on <code>this</code>.
049 */
050@InterfaceAudience.Private
051public class RegionServerSpaceQuotaManager {
052  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class);
053
054  private final RegionServerServices rsServices;
055
056  private SpaceQuotaRefresherChore spaceQuotaRefresher;
057  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
058  private boolean started = false;
059  private final ConcurrentHashMap<TableName, SpaceViolationPolicyEnforcement> enforcedPolicies;
060  private SpaceViolationPolicyEnforcementFactory factory;
061  private RegionSizeStore regionSizeStore;
062  private RegionSizeReportingChore regionSizeReporter;
063
064  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
065    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
066  }
067
068  RegionServerSpaceQuotaManager(RegionServerServices rsServices,
069    SpaceViolationPolicyEnforcementFactory factory) {
070    this.rsServices = Objects.requireNonNull(rsServices);
071    this.factory = factory;
072    this.enforcedPolicies = new ConcurrentHashMap<>();
073    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
074    // Initialize the size store to not track anything -- create the real one if we're start()'ed
075    this.regionSizeStore = NoOpRegionSizeStore.getInstance();
076  }
077
078  public synchronized void start() throws IOException {
079    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
080      LOG.info("Quota support disabled, not starting space quota manager.");
081      return;
082    }
083
084    if (started) {
085      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
086      return;
087    }
088    // Start the chores
089    this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection());
090    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
091    this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
092    rsServices.getChoreService().scheduleChore(regionSizeReporter);
093    // Instantiate the real RegionSizeStore
094    this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
095    started = true;
096  }
097
098  public synchronized void stop() {
099    if (spaceQuotaRefresher != null) {
100      spaceQuotaRefresher.shutdown();
101      spaceQuotaRefresher = null;
102    }
103    if (regionSizeReporter != null) {
104      regionSizeReporter.shutdown();
105      regionSizeReporter = null;
106    }
107    started = false;
108  }
109
110  /** Returns if the {@code Chore} has been started. */
111  public boolean isStarted() {
112    return started;
113  }
114
115  /**
116   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view of what the
117   * RegionServer thinks the table's utilization is.
118   */
119  public Map<TableName, SpaceQuotaSnapshot> copyQuotaSnapshots() {
120    return new HashMap<>(currentQuotaSnapshots.get());
121  }
122
123  /**
124   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
125   * @param newSnapshots The space quota snapshots.
126   */
127  public void updateQuotaSnapshot(Map<TableName, SpaceQuotaSnapshot> newSnapshots) {
128    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
129  }
130
131  /**
132   * Creates an object well-suited for the RegionServer to use in verifying active policies.
133   */
134  public ActivePolicyEnforcement getActiveEnforcements() {
135    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
136  }
137
138  /**
139   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
140   * {@link SpaceViolationPolicy}s.
141   */
142  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
143    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = copyActiveEnforcements();
144    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
145    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
146      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
147      if (snapshot != null) {
148        policies.put(entry.getKey(), snapshot);
149      }
150    }
151    return policies;
152  }
153
154  /**
155   * Enforces the given violationPolicy on the given table in this RegionServer.
156   */
157  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
158    SpaceQuotaStatus status = snapshot.getQuotaStatus();
159    if (!status.isInViolation()) {
160      throw new IllegalStateException(
161        tableName + " is not in violation. Violation policy should not be enabled.");
162    }
163    if (LOG.isTraceEnabled()) {
164      LOG.trace("Enabling violation policy enforcement on " + tableName + " with policy "
165        + status.getPolicy());
166    }
167    // Construct this outside of the lock
168    final SpaceViolationPolicyEnforcement enforcement =
169      getFactory().create(getRegionServerServices(), tableName, snapshot);
170    // "Enables" the policy
171    // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
172    // allow policy enable/disable on different tables to happen concurrently. As written now, only
173    // one table will be allowed to transition at a time. This is probably OK, but not sure if
174    // it would become a bottleneck at large clusters/number of tables.
175    synchronized (enforcedPolicies) {
176      try {
177        enforcement.enable();
178      } catch (IOException e) {
179        LOG.error("Failed to enable space violation policy for " + tableName
180          + ". This table will not enter violation.", e);
181        return;
182      }
183      enforcedPolicies.put(tableName, enforcement);
184    }
185  }
186
187  /**
188   * Disables enforcement on any violation policy on the given <code>tableName</code>.
189   */
190  public void disableViolationPolicyEnforcement(TableName tableName) {
191    if (LOG.isTraceEnabled()) {
192      LOG.trace("Disabling violation policy enforcement on " + tableName);
193    }
194    // "Disables" the policy
195    synchronized (enforcedPolicies) {
196      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
197      if (enforcement != null) {
198        try {
199          enforcement.disable();
200        } catch (IOException e) {
201          LOG.error("Failed to disable space violation policy for " + tableName
202            + ". This table will remain in violation.", e);
203          enforcedPolicies.put(tableName, enforcement);
204        }
205      }
206    }
207  }
208
209  /**
210   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
211   * a space quota violation policy. A convenience method.
212   * @param tableName The table to check
213   * @return True if compactions should be disabled for the table, false otherwise.
214   */
215  public boolean areCompactionsDisabled(TableName tableName) {
216    SpaceViolationPolicyEnforcement enforcement =
217      this.enforcedPolicies.get(Objects.requireNonNull(tableName));
218    if (enforcement != null) {
219      return enforcement.areCompactionsDisabled();
220    }
221    return false;
222  }
223
224  /**
225   * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
226   * @return A {@link RegionSizeStore} implementation.
227   */
228  public RegionSizeStore getRegionSizeStore() {
229    return regionSizeStore;
230  }
231
232  /**
233   * Builds the protobuf message to inform the Master of files being archived.
234   * @param tn            The table the files previously belonged to.
235   * @param archivedFiles The files and their size in bytes that were archived.
236   * @return The protobuf representation
237   */
238  public RegionServerStatusProtos.FileArchiveNotificationRequest
239    buildFileArchiveRequest(TableName tn, Collection<Entry<String, Long>> archivedFiles) {
240    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
241      RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
242    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
243    for (Entry<String, Long> archivedFile : archivedFiles) {
244      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws =
245        RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
246          .setName(archivedFile.getKey()).setSize(archivedFile.getValue()).setTableName(protoTn)
247          .build();
248      builder.addArchivedFiles(fws);
249    }
250    final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build();
251    if (LOG.isTraceEnabled()) {
252      LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request));
253    }
254    return request;
255  }
256
257  /**
258   * Returns the collection of tables which have quota violation policies enforced on this
259   * RegionServer.
260   */
261  Map<TableName, SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
262    // Allows reads to happen concurrently (or while the map is being updated)
263    return new HashMap<>(this.enforcedPolicies);
264  }
265
266  RegionServerServices getRegionServerServices() {
267    return rsServices;
268  }
269
270  Connection getConnection() {
271    return rsServices.getConnection();
272  }
273
274  SpaceViolationPolicyEnforcementFactory getFactory() {
275    return factory;
276  }
277}